[GitHub] [kafka] bbejeck commented on pull request #8789: MINOR: Fix the javadoc broken links of streams
bbejeck commented on pull request #8789: URL: https://github.com/apache/kafka/pull/8789#issuecomment-638886703 Ok to test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #8789: MINOR: Fix the javadoc broken links of streams
bbejeck commented on pull request #8789: URL: https://github.com/apache/kafka/pull/8789#issuecomment-638945282 Java 11 failed with `java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8775: KAFKA-10079: improve thread-level stickiness
ableegoldman commented on a change in pull request #8775: URL: https://github.com/apache/kafka/pull/8775#discussion_r435374125 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java ## @@ -300,20 +302,71 @@ public void shouldNotHaveUnfulfilledQuotaWhenActiveTaskSizeGreaterEqualThanCapac @Test public void shouldAddTasksWithLatestOffsetToPrevActiveTasks() { final Map taskOffsetSums = Collections.singletonMap(TASK_0_1, Task.LATEST_OFFSET); -client.addPreviousTasksAndOffsetSums(taskOffsetSums); +client.addPreviousTasksAndOffsetSums("c1", taskOffsetSums); client.initializePrevTasks(Collections.emptyMap()); assertThat(client.prevActiveTasks(), equalTo(Collections.singleton(TASK_0_1))); assertThat(client.previousAssignedTasks(), equalTo(Collections.singleton(TASK_0_1))); assertTrue(client.prevStandbyTasks().isEmpty()); } +@Test +public void shouldReturnPreviousStatefulTasksForConsumer() { +client.addPreviousTasksAndOffsetSums("c1", Collections.singletonMap(TASK_0_1, Task.LATEST_OFFSET)); +client.addPreviousTasksAndOffsetSums("c2", Collections.singletonMap(TASK_0_2, 0L)); +client.addPreviousTasksAndOffsetSums("c3", Collections.emptyMap()); + +client.initializePrevTasks(Collections.emptyMap()); +client.computeTaskLags( +UUID_1, +mkMap( +mkEntry(TASK_0_1, 1_000L), +mkEntry(TASK_0_2, 1_000L) +) +); + +assertThat(client.previousTasksForConsumer("c1"), equalTo(mkSortedSet(TASK_0_1))); +assertThat(client.previousTasksForConsumer("c2"), equalTo(mkSortedSet(TASK_0_2))); +assertTrue(client.previousTasksForConsumer("c3").isEmpty()); +} + +@Test +public void shouldReturnPreviousStatefulTasksForConsumerWhenLagIsNotComputed() { +client.addPreviousTasksAndOffsetSums("c1", Collections.singletonMap(TASK_0_1, 1000L)); +client.initializePrevTasks(Collections.emptyMap()); + +assertThat(client.previousTasksForConsumer("c1"), equalTo(mkSortedSet(TASK_0_1))); +} + +@Test +public void shouldReturnPreviousStatefulTasksForConsumerInIncreasingLagOrder() { Review comment: You didn't miss it, I just snuck it in there after your review :P Sorry, should have called out that I made some more changes. I think that was the only significant logical change though. I'll try pulling the sort out into the assignment code This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison merged pull request #8311: KAFKA-9434: automated protocol for alterReplicaLogDirs
mimaison merged pull request #8311: URL: https://github.com/apache/kafka/pull/8311 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10100) LiveLeaders field in LeaderAndIsrRequest is not used anymore
[ https://issues.apache.org/jira/browse/KAFKA-10100?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17125984#comment-17125984 ] Ismael Juma commented on KAFKA-10100: - I had noticed this too. I was wondering if it's worth doing this given that KIP-500 will change how we propagate metadata. If there's a concrete performance benefit, then we should do it, of course. > LiveLeaders field in LeaderAndIsrRequest is not used anymore > > > Key: KAFKA-10100 > URL: https://issues.apache.org/jira/browse/KAFKA-10100 > Project: Kafka > Issue Type: Improvement >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > > We have noticed that the `LiveLeaders` field in the LeaderAndIsrRequest is > not used anywhere but still populated by the controller. > It seems that that field was introduced in AK `0.8.0` and was supposed to be > removed in AK `0.8.1`: > [https://github.com/apache/kafka/blob/0.8.0/core/src/main/scala/kafka/cluster/Partition.scala#L194.] > I think that we can safely deprecate the field and stop populating it for all > versions > 0. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10101) recovery point is advanced without flushing the data after recovery
Jun Rao created KAFKA-10101: --- Summary: recovery point is advanced without flushing the data after recovery Key: KAFKA-10101 URL: https://issues.apache.org/jira/browse/KAFKA-10101 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.5.0 Reporter: Jun Rao Currently, in Log.recoverLog(), we set recoveryPoint to logEndOffset after recovering the log segment. However, we don't flush the log segments after recovery. The potential issue is that if the broker has another hard failure, segments may be corrupted on disk but won't be going through recovery on another restart. This logic was introduced in KAFKA-5829 since 1.0.0. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10090) Misleading warnings: The configuration was supplied but isn't a known config
[ https://issues.apache.org/jira/browse/KAFKA-10090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126067#comment-17126067 ] Chia-Ping Tsai commented on KAFKA-10090: [~rwruck] Are you working at it? I can take over this if you have no free time :) > Misleading warnings: The configuration was supplied but isn't a known config > > > Key: KAFKA-10090 > URL: https://issues.apache.org/jira/browse/KAFKA-10090 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.5.0 >Reporter: Robert Wruck >Priority: Major > > In our setup (using Spring cloud stream Kafka binder), we see log messages > like: > > {{The configuration 'ssl.keystore.password' was supplied but isn't a known > config}} > > logged by org.apache.kafka.clients.admin.AdminClientConfig. The Kafka binder > actually uses SSL and security.protocol is set to SSL. > Looking through the code, a few things seem odd: > * The log message says "isn't a known config" but that's not true. It is > *known*, i.e. defined in ConfigDef, but not *used*. > * The method for detecting whether a config is actually *used* is not > complete. ChannelBuilders.channelBuilderConfigs() for example extracts the > configs to use for the created channel builder using *new > HashMap(config.values())* thus *get()* won't mark a config as used anymore. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10097) Avoid getting null map for task checkpoint
[ https://issues.apache.org/jira/browse/KAFKA-10097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126001#comment-17126001 ] Boyang Chen commented on KAFKA-10097: - [~feyman] Thank you for the interest! Unfortunately this ticket is still under discussion, will let you know the result. Meanwhile I will look for other side improvements for Streams to get you started :) > Avoid getting null map for task checkpoint > -- > > Key: KAFKA-10097 > URL: https://issues.apache.org/jira/browse/KAFKA-10097 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Boyang Chen >Priority: Major > > In StreamTask, we have the logic to generate a checkpoint offset map to be > materialized through StateManager#checkpoint. This map could be either empty > map or null, which the former case indicates to only pull down existing state > store checkpoint data, while the latter indicates no need to do a checkpoint > in the case such as we are suspending a task. > Having two similar special logics for checkpointing could lead to unexpected > bugs, also we should think about separating the empty checkpoint case vs > passed-in checkpoint case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10101) recovery point is advanced without flushing the data after recovery
[ https://issues.apache.org/jira/browse/KAFKA-10101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126036#comment-17126036 ] Jun Rao commented on KAFKA-10101: - If this is an issue, one way to fix that is to simply not advance recoveryPoint after recovery. > recovery point is advanced without flushing the data after recovery > --- > > Key: KAFKA-10101 > URL: https://issues.apache.org/jira/browse/KAFKA-10101 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.5.0 >Reporter: Jun Rao >Priority: Major > > Currently, in Log.recoverLog(), we set recoveryPoint to logEndOffset after > recovering the log segment. However, we don't flush the log segments after > recovery. The potential issue is that if the broker has another hard failure, > segments may be corrupted on disk but won't be going through recovery on > another restart. > This logic was introduced in KAFKA-5829 since 1.0.0. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rhauch commented on pull request #8069: KAFKA-9374: Make connector interactions asynchronous
rhauch commented on pull request #8069: URL: https://github.com/apache/kafka/pull/8069#issuecomment-638990957 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #8069: KAFKA-9374: Make connector interactions asynchronous
rhauch commented on pull request #8069: URL: https://github.com/apache/kafka/pull/8069#issuecomment-638991218 This should be backported to the `2.6` branch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch edited a comment on pull request #8069: KAFKA-9374: Make connector interactions asynchronous
rhauch edited a comment on pull request #8069: URL: https://github.com/apache/kafka/pull/8069#issuecomment-638991218 This should be backported at least to the `2.6` branch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #8311: KAFKA-9434: automated protocol for alterReplicaLogDirs
mimaison commented on pull request #8311: URL: https://github.com/apache/kafka/pull/8311#issuecomment-638872285 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs
vvcephei commented on a change in pull request #8787: URL: https://github.com/apache/kafka/pull/8787#discussion_r435416204 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java ## @@ -562,23 +564,18 @@ private void restoreChangelog(final ChangelogMetadata changelogMetadata) { } private Map committedOffsetForChangelogs(final Set partitions) { -if (partitions.isEmpty()) -return Collections.emptyMap(); - Review comment: What's the idea of dropping this? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java ## @@ -562,23 +564,18 @@ private void restoreChangelog(final ChangelogMetadata changelogMetadata) { } private Map committedOffsetForChangelogs(final Set partitions) { -if (partitions.isEmpty()) -return Collections.emptyMap(); - final Map committedOffsets; try { -// those do not have a committed offset would default to 0 -committedOffsets = mainConsumer.committed(partitions).entrySet().stream() -.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() == null ? 0L : e.getValue().offset())); -} catch (final TimeoutException e) { -// if it timed out we just retry next time. -return Collections.emptyMap(); -} catch (final KafkaException e) { -throw new StreamsException(String.format("Failed to retrieve end offsets for %s", partitions), e); +committedOffsets = fetchCommittedOffsets(partitions, mainConsumer); +} catch (final StreamsException e) { +if (e.getCause() instanceof TimeoutException) { Review comment: This seems to be a step backwards, actually. Why wrap it as a StreamsException only just to immediately unwrap it again? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8775: KAFKA-10079: improve thread-level stickiness
vvcephei commented on a change in pull request #8775: URL: https://github.com/apache/kafka/pull/8775#discussion_r43547 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java ## @@ -302,14 +300,19 @@ public void computeTaskLags(final UUID uuid, final Map allTaskEndO * @return end offset sum - offset sum * Task.LATEST_OFFSET if this was previously an active running task on this client */ -long lagFor(final TaskId task) { -final Long totalLag = taskLagTotals.get(task); +public long lagFor(final TaskId task) { +final Long totalLag; +if (taskLagTotals.isEmpty()) { +// If we couldn't compute the task lags due to failure to fetch offsets, just return a flat constant +totalLag = 0L; Review comment: Is this the right constant to represent "we don't know the lag"? Or did I mistake how this is going to be used? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10101) recovery point is advanced without flushing the data after recovery
[ https://issues.apache.org/jira/browse/KAFKA-10101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126023#comment-17126023 ] Jun Rao commented on KAFKA-10101: - [~ijuma] : Could you double check if this is an issue? Thanks. > recovery point is advanced without flushing the data after recovery > --- > > Key: KAFKA-10101 > URL: https://issues.apache.org/jira/browse/KAFKA-10101 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.5.0 >Reporter: Jun Rao >Priority: Major > > Currently, in Log.recoverLog(), we set recoveryPoint to logEndOffset after > recovering the log segment. However, we don't flush the log segments after > recovery. The potential issue is that if the broker has another hard failure, > segments may be corrupted on disk but won't be going through recovery on > another restart. > This logic was introduced in KAFKA-5829 since 1.0.0. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9576) Topic creation failure causing Stream thread death
[ https://issues.apache.org/jira/browse/KAFKA-9576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-9576. Resolution: Duplicate > Topic creation failure causing Stream thread death > -- > > Key: KAFKA-9576 > URL: https://issues.apache.org/jira/browse/KAFKA-9576 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.4.0 >Reporter: Boyang Chen >Priority: Major > > The failure to create an internal topic could lead to the stream thread death > due to timeout: > {code:java} > [2020-02-14T03:03:00-08:00] > (streams-soak-2-4-eos_soak_i-01c4a64bbd04974db_streamslog) [2020-02-14 > 11:03:00,083] ERROR > [stream-soak-test-c818a925-a8fd-4a81-9a26-1c744d52ff2f-StreamThread-1] > stream-thread [main] Unexpected error during topic description for > stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-19-changelog. > [2020-02-14T03:03:00-08:00] > (streams-soak-2-4-eos_soak_i-01c4a64bbd04974db_streamslog) [2020-02-14 > 11:03:00,083] ERROR > [stream-soak-test-c818a925-a8fd-4a81-9a26-1c744d52ff2f-StreamThread-1] > stream-thread > [stream-soak-test-c818a925-a8fd-4a81-9a26-1c744d52ff2f-StreamThread-1] > Encountered the following unexpected Kafka exception during processing, this > usually indicate Streams internal errors: > (org.apache.kafka.streams.processor.internals.StreamThread) > [2020-02-14T03:03:00-08:00] > (streams-soak-2-4-eos_soak_i-01c4a64bbd04974db_streamslog) > org.apache.kafka.streams.errors.StreamsException: Could not create topic > stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-19-changelog. > at > org.apache.kafka.streams.processor.internals.InternalTopicManager.getNumPartitions(InternalTopicManager.java:209) > at > org.apache.kafka.streams.processor.internals.InternalTopicManager.validateTopics(InternalTopicManager.java:223) > at > org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:106) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.prepareTopic(StreamsPartitionAssignor.java:1229) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:588) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:548) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:650) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1300(AbstractCoordinator.java:111) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:572) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:555) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:400) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:340) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:471) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1267) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231) > at >
[GitHub] [kafka] bbejeck commented on pull request #8789: MINOR: Fix the javadoc broken links of streams
bbejeck commented on pull request #8789: URL: https://github.com/apache/kafka/pull/8789#issuecomment-638979955 Thanks for the contribution, @vitojeng. This PR looks good to me, I'd like to verify the links locally, but after that, we'll get this merged. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10103) JDBC Sink Connector doesn't support numerical values in event keys
Jakub created KAFKA-10103: - Summary: JDBC Sink Connector doesn't support numerical values in event keys Key: KAFKA-10103 URL: https://issues.apache.org/jira/browse/KAFKA-10103 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.0.1 Reporter: Jakub Our topics contain events with numerical keys and Avro values. We're trying to configure a JDBC connector to export data from these topics to Oracle DB, but it doesn't seem to work. If we use strings as keys everything works fine, but if we switch to Longs it stops working. We tried different values of _key.converter_, including _org.apache.kafka.connect.converters.ByteArrayConverter,_ but either parsing of keys doesn't work, or they cannot be mapped to Oracle data type (NUMBER - this happens if we use _ByteArrayConverter)._ We also tried using transformations (CAST), but in that case we're getting _Cast transformation does not support casting to/from BYTES_ Please excuse if this is not a bug and there is a way to work with numerical keys, we just couldn't find anything about that in the documentation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] heritamas commented on a change in pull request #8730: KAFKA-10048: Possible data gap for a consumer after a failover when u…
heritamas commented on a change in pull request #8730: URL: https://github.com/apache/kafka/pull/8730#discussion_r435424111 ## File path: connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java ## @@ -132,7 +132,7 @@ public String version() { return listConsumerGroupOffsets(group).entrySet().stream() .filter(x -> shouldCheckpointTopic(x.getKey().topic())) .map(x -> checkpoint(group, x.getKey(), x.getValue())) -.filter(x -> x.downstreamOffset() > 0) // ignore offsets we cannot translate accurately +.filter(x -> x.downstreamOffset() >= 0) // ignore offsets we cannot translate accurately Review comment: Can downstream offset be negative? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient
hachikuji commented on pull request #8724: URL: https://github.com/apache/kafka/pull/8724#issuecomment-638962844 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on pull request #8717: KAFKA-10033: Throw UnknownTopicOrPartitionException when modifying a non-existent topic's config
omkreddy commented on pull request #8717: URL: https://github.com/apache/kafka/pull/8717#issuecomment-639036963 @bdbyrne Can you rebase the PR? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9377) Refactor StreamsPartitionAssignor Repartition Count logic
[ https://issues.apache.org/jira/browse/KAFKA-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126166#comment-17126166 ] Boyang Chen commented on KAFKA-9377: [~feyman] Hey I'm assigning this ticket to you. A little bit more context: You could search for function `StreamsPartitionAssignor#setRepartitionTopicMetadataNumberOfPartitions` and look at what it does currently. It basically tries to initialize all the node repartition topic count by doing a random walk through of every node in an infinite loop. This is not efficient and intuitive, and we have been planning to refactor it and build a bottom-up DFS search, meaning that a parent node could only be initialized after all its children's topic partitions are initialized. Another reference is a unit test case `StreamsPartitionAssignor#shouldNotFailOnBranchedMultiLevelRepartitionConnectedTopology` which is validating a bug we fixed inside this logic a while ago, which hopefully gives you better insight. Let me know if this makes sense to you, I know it's a bit unfriendly for a beginner task, but it definitely worths your effort to dig in and understand the KStream topology creation knowledge. > Refactor StreamsPartitionAssignor Repartition Count logic > - > > Key: KAFKA-9377 > URL: https://issues.apache.org/jira/browse/KAFKA-9377 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Boyang Chen >Assignee: feyman >Priority: Major > > The current repartition count uses a big while loop to randomly initialize > each repartition topic counts, which is error-prone and hard to maintain. A > more intuitive and robust solution would be doing a DFS search from > bottom-up, where we initialize all the sink nodes repartition topic counts by > making sure all their parents are initialized. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management
vvcephei commented on a change in pull request #8776: URL: https://github.com/apache/kafka/pull/8776#discussion_r435523829 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final Map changelogEntry : changelogOffsets.entrySet()) { final long offset = changelogEntry.getValue(); -offsetSum += offset; -if (offsetSum < 0) { -log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE", id); -return Long.MAX_VALUE; +if (offset == Task.LATEST_OFFSET) { +return Task.LATEST_OFFSET; +} else { +offsetSum += offset; Review comment: It might be nice to have a sanity check here that `offset` is non-negative, since that would indicate we've unexpectedly received a sentinel value. I thought we did that already, but it's obviously not here. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final Map changelogEntry : changelogOffsets.entrySet()) { final long offset = changelogEntry.getValue(); -offsetSum += offset; -if (offsetSum < 0) { -log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE", id); -return Long.MAX_VALUE; +if (offset == Task.LATEST_OFFSET) { Review comment: This seems pretty subtle, can you convert your GH explanation into a code comment? It also seems mentioning that we assume that if any changelog offset in the task is "latest", then we assume the whole task is active and therefore return "latest". Took me a minute to work that out. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy closed pull request #8715: KAFKA-10033: AdminClient should throw UnknownTopicOrPartitionException instead of UnknownServerException if altering configs of non-existing topic
omkreddy closed pull request #8715: URL: https://github.com/apache/kafka/pull/8715 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on a change in pull request #8717: KAFKA-10033: Throw UnknownTopicOrPartitionException when modifying a non-existent topic's config
omkreddy commented on a change in pull request #8717: URL: https://github.com/apache/kafka/pull/8717#discussion_r435468757 ## File path: core/src/main/scala/kafka/server/AdminManager.scala ## @@ -454,6 +454,9 @@ class AdminManager(val config: KafkaConfig, private def alterTopicConfigs(resource: ConfigResource, validateOnly: Boolean, configProps: Properties, configEntriesMap: Map[String, String]): (ConfigResource, ApiError) = { val topic = resource.name +if (!metadataCache.contains(topic)) + throw new UnknownTopicOrPartitionException(s"The topic '$topic' does not exist.") + adminZkClient.validateTopicConfig(topic, configProps) Review comment: yeah, there is lot scope for cleanup. we can improve future PRs. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8775: KAFKA-10079: improve thread-level stickiness
vvcephei commented on pull request #8775: URL: https://github.com/apache/kafka/pull/8775#issuecomment-639068238 Test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable
hachikuji commented on pull request #8486: URL: https://github.com/apache/kafka/pull/8486#issuecomment-639102074 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] szalapski commented on pull request #5876: KAFKA-7509: Avoid passing most non-applicable properties to producer, consumer, and admin client
szalapski commented on pull request #5876: URL: https://github.com/apache/kafka/pull/5876#issuecomment-639110680 I still hope this can be fixed soon. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8775: KAFKA-10079: improve thread-level stickiness
vvcephei commented on pull request #8775: URL: https://github.com/apache/kafka/pull/8775#issuecomment-639128329 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xvrl opened a new pull request #8800: [MINOR] fix incorrect GC log size with JDK9+
xvrl opened a new pull request #8800: URL: https://github.com/apache/kafka/pull/8800 file size was incorrectly set to 100KB instead of 100MB This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] d8tltanc commented on a change in pull request #8421: KAFKA-9800: [KIP-580] Admin Client Exponential Backoff Implementation
d8tltanc commented on a change in pull request #8421: URL: https://github.com/apache/kafka/pull/8421#discussion_r435492178 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -683,21 +686,65 @@ public Node provide() { } } +/** + * Provides context which the retry may refer to + */ +class CallRetryContext { + +private int tries = 0; +private long nextAllowedTryMs = 0; +private final double JITTER_MIN = 0.8; +private final double JITTER_MAX = 1.2; + +public int tries() { +return tries; +} + +public long nextAllowedTryMs() { +return nextAllowedTryMs; +} + +private void updateTries(int currentTries) { +this.tries = currentTries + 1; +} + +private void updateNextAllowTryMs() { +double jitter = Math.random() * (JITTER_MAX - JITTER_MIN) + JITTER_MIN; +int failures = tries - 1; +double exp = Math.pow(2, failures); +this.nextAllowedTryMs = time.milliseconds() + +(long) Math.min(retryBackoffMaxMs, jitter * exp * retryBackoffMs); +// TODO: Remove the line below +System.out.println("nextAllow = " + (long) Math.min(retryBackoffMaxMs, jitter * exp * retryBackoffMs)); +} Review comment: Make sense. Will extract it to a util class in my KIP-601 implementation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request #8801: KAFKA-10100; LiveLeaders field in LeaderAndIsrRequest is not used anymore
dajac opened a new pull request #8801: URL: https://github.com/apache/kafka/pull/8801 We have noticed that the `LiveLeaders` field in the LeaderAndIsrRequest is not used anywhere but still populated by the controller. It seems that that field was introduced in AK `0.8.0` and was supposed to be removed in AK `0.8.1`: https://github.com/apache/kafka/blob/0.8.0/core/src/main/scala/kafka/cluster/Partition.scala#L194. It has not been used since then. This PR proposes to simply stop populating the field for any version > 0. It avoids to compute the live leaders set and also reduces the size of the request on the wire. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #8069: KAFKA-9374: Make connector interactions asynchronous
rhauch commented on pull request #8069: URL: https://github.com/apache/kafka/pull/8069#issuecomment-639076340 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management
ableegoldman commented on a change in pull request #8776: URL: https://github.com/apache/kafka/pull/8776#discussion_r43000 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final Map changelogEntry : changelogOffsets.entrySet()) { final long offset = changelogEntry.getValue(); -offsetSum += offset; -if (offsetSum < 0) { -log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE", id); -return Long.MAX_VALUE; +if (offset == Task.LATEST_OFFSET) { Review comment: Well, this is by definition of the sentinel `LATEST_OFFSET` -- but I agree it's subtle that if any offset is "latest" then we know they all are, ie the task is active and RUNNING This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8107: MINOR: Remove Diamond and code code Alignment
vvcephei commented on pull request #8107: URL: https://github.com/apache/kafka/pull/8107#issuecomment-639124739 Ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] omkreddy commented on pull request #8715: KAFKA-10033: AdminClient should throw UnknownTopicOrPartitionException instead of UnknownServerException if altering configs of non-existing
omkreddy commented on pull request #8715: URL: https://github.com/apache/kafka/pull/8715#issuecomment-639031210 @gnkoshelev Thanks for the PR. LGTM. I think, we can merge both PRs (#8717 ). `AdminZkClient.validateTopicConfig()` is used in command line tools (This will go when remove ZK dependency). we may want throw same error in all places. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management
ableegoldman commented on a change in pull request #8776: URL: https://github.com/apache/kafka/pull/8776#discussion_r435552502 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final Map changelogEntry : changelogOffsets.entrySet()) { final long offset = changelogEntry.getValue(); -offsetSum += offset; -if (offsetSum < 0) { -log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE", id); -return Long.MAX_VALUE; +if (offset == Task.LATEST_OFFSET) { +return Task.LATEST_OFFSET; +} else { +offsetSum += offset; Review comment: We put the check for negative offsets in the RecordCollector, but I guess it doesn't hurt to check again here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management
vvcephei commented on a change in pull request #8776: URL: https://github.com/apache/kafka/pull/8776#discussion_r435559860 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final Map changelogEntry : changelogOffsets.entrySet()) { final long offset = changelogEntry.getValue(); -offsetSum += offset; -if (offsetSum < 0) { -log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE", id); -return Long.MAX_VALUE; +if (offset == Task.LATEST_OFFSET) { +return Task.LATEST_OFFSET; +} else { +offsetSum += offset; Review comment: Aha! Thanks. Yeah, I'd be in favor of coding defensively here as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10069) The user-defined "predicate" and "negate" are not removed from Transformation
[ https://issues.apache.org/jira/browse/KAFKA-10069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis resolved KAFKA-10069. Resolution: Fixed > The user-defined "predicate" and "negate" are not removed from Transformation > - > > Key: KAFKA-10069 > URL: https://issues.apache.org/jira/browse/KAFKA-10069 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > Fix For: 2.6.0 > > > There are official configDef for both "predicate" and "negate" so we should > remove user-defined configDef. However, current behavior does incorrect > comparison so the duplicate key will destroy the following embed configDef. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10069) The user-defined "predicate" and "negate" are not removed from Transformation
[ https://issues.apache.org/jira/browse/KAFKA-10069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-10069: --- Fix Version/s: 2.6.0 > The user-defined "predicate" and "negate" are not removed from Transformation > - > > Key: KAFKA-10069 > URL: https://issues.apache.org/jira/browse/KAFKA-10069 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > Fix For: 2.6.0 > > > There are official configDef for both "predicate" and "negate" so we should > remove user-defined configDef. However, current behavior does incorrect > comparison so the duplicate key will destroy the following embed configDef. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient
hachikuji commented on pull request #8724: URL: https://github.com/apache/kafka/pull/8724#issuecomment-639033169 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-9377) Refactor StreamsPartitionAssignor Repartition Count logic
[ https://issues.apache.org/jira/browse/KAFKA-9377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen reassigned KAFKA-9377: -- Assignee: feyman (was: Boyang Chen) > Refactor StreamsPartitionAssignor Repartition Count logic > - > > Key: KAFKA-9377 > URL: https://issues.apache.org/jira/browse/KAFKA-9377 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Boyang Chen >Assignee: feyman >Priority: Major > > The current repartition count uses a big while loop to randomly initialize > each repartition topic counts, which is error-prone and hard to maintain. A > more intuitive and robust solution would be doing a DFS search from > bottom-up, where we initialize all the sink nodes repartition topic counts by > making sure all their parents are initialized. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10104) Remove deprecated --zookeeper flags as specified in KIP-604
Colin McCabe created KAFKA-10104: Summary: Remove deprecated --zookeeper flags as specified in KIP-604 Key: KAFKA-10104 URL: https://issues.apache.org/jira/browse/KAFKA-10104 Project: Kafka Issue Type: Improvement Reporter: Colin McCabe Remove deprecated --zookeeper flags as specified in KIP-604 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] C0urante commented on pull request #8135: KAFKA-9570: Define SSL configs in all worker config classes, not just distributed
C0urante commented on pull request #8135: URL: https://github.com/apache/kafka/pull/8135#issuecomment-639008252 @rhauch would it be possible to backport this to 2.6 before the upcoming release? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #8135: KAFKA-9570: Define SSL configs in all worker config classes, not just distributed
rhauch commented on pull request #8135: URL: https://github.com/apache/kafka/pull/8135#issuecomment-639015817 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10090) Misleading warnings: The configuration was supplied but isn't a known config
[ https://issues.apache.org/jira/browse/KAFKA-10090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126132#comment-17126132 ] Robert Wruck commented on KAFKA-10090: -- [~chia7712] That would be great. I'm not really familiar with the code, so I'd have to decide whether to implement a fix in ChannelBuilders or RecordingMap. > Misleading warnings: The configuration was supplied but isn't a known config > > > Key: KAFKA-10090 > URL: https://issues.apache.org/jira/browse/KAFKA-10090 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.5.0 >Reporter: Robert Wruck >Priority: Major > > In our setup (using Spring cloud stream Kafka binder), we see log messages > like: > > {{The configuration 'ssl.keystore.password' was supplied but isn't a known > config}} > > logged by org.apache.kafka.clients.admin.AdminClientConfig. The Kafka binder > actually uses SSL and security.protocol is set to SSL. > Looking through the code, a few things seem odd: > * The log message says "isn't a known config" but that's not true. It is > *known*, i.e. defined in ConfigDef, but not *used*. > * The method for detecting whether a config is actually *used* is not > complete. ChannelBuilders.channelBuilderConfigs() for example extracts the > configs to use for the created channel builder using *new > HashMap(config.values())* thus *get()* won't mark a config as used anymore. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] abbccdda commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable
abbccdda commented on a change in pull request #8486: URL: https://github.com/apache/kafka/pull/8486#discussion_r435505756 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ## @@ -585,34 +585,29 @@ public void testFetchProgressWithMissingPartitionPosition() { consumer.seekToBeginning(singleton(tp1)); client.prepareResponse( -new MockClient.RequestMatcher() { -@Override -public boolean matches(AbstractRequest body) { -ListOffsetRequest request = (ListOffsetRequest) body; -Map timestamps = request.partitionTimestamps(); -return timestamps.get(tp0).timestamp == ListOffsetRequest.LATEST_TIMESTAMP && -timestamps.get(tp1).timestamp == ListOffsetRequest.EARLIEST_TIMESTAMP; -} -}, listOffsetsResponse(Collections.singletonMap(tp0, 50L), +body -> { +ListOffsetRequest request = (ListOffsetRequest) body; +Map timestamps = request.partitionTimestamps(); +return timestamps.get(tp0).timestamp == ListOffsetRequest.LATEST_TIMESTAMP && +timestamps.get(tp1).timestamp == ListOffsetRequest.EARLIEST_TIMESTAMP; +}, listOffsetsResponse(Collections.singletonMap(tp0, 50L), Collections.singletonMap(tp1, Errors.NOT_LEADER_FOR_PARTITION))); client.prepareResponse( -new MockClient.RequestMatcher() { -@Override -public boolean matches(AbstractRequest body) { -FetchRequest request = (FetchRequest) body; -return request.fetchData().keySet().equals(singleton(tp0)) && -request.fetchData().get(tp0).fetchOffset == 50L; +body -> { +FetchRequest request = (FetchRequest) body; +return request.fetchData().keySet().equals(singleton(tp0)) && +request.fetchData().get(tp0).fetchOffset == 50L; -} -}, fetchResponse(tp0, 50L, 5)); +}, fetchResponse(tp0, 50L, 5)); ConsumerRecords records = consumer.poll(Duration.ofMillis(1)); assertEquals(5, records.count()); assertEquals(singleton(tp0), records.partitions()); } private void initMetadata(MockClient mockClient, Map partitionCounts) { -MetadataResponse initialMetadata = TestUtils.metadataUpdateWith(1, partitionCounts); +int leaderEpoch = 1; Review comment: This is the actual fix, the other parts are mostly cleanup This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management
mjsax commented on a change in pull request #8776: URL: https://github.com/apache/kafka/pull/8776#discussion_r435589707 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final Map changelogEntry : changelogOffsets.entrySet()) { final long offset = changelogEntry.getValue(); -offsetSum += offset; -if (offsetSum < 0) { -log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE", id); -return Long.MAX_VALUE; +if (offset == Task.LATEST_OFFSET) { +return Task.LATEST_OFFSET; +} else { +offsetSum += offset; Review comment: Ack This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8771: MINOR: Add explanation for disabling forwarding from value transformers
mjsax commented on pull request #8771: URL: https://github.com/apache/kafka/pull/8771#issuecomment-639157574 Thanks for the PR @astubbs! Merged to `trunk` and cherry-picked to `2.6`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request #8802: MINOR: Fix fetch session epoch comment in `FetchRequest.json`
hachikuji opened a new pull request #8802: URL: https://github.com/apache/kafka/pull/8802 The current "about" string incorrectly describes the session epoch as the partition epoch. Rename to `SessionEpoch` to make usage clearer. Also rename `MaxWait` to `MaxWaitTimeMs` and `FetchableTopic` to `FetchTopic` for consistency with `FetchPartition`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #8680: KAFKA-10027: Implement read path for feature versioning system (KIP-584)
junrao commented on a change in pull request #8680: URL: https://github.com/apache/kafka/pull/8680#discussion_r435558683 ## File path: clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java ## @@ -113,14 +127,44 @@ public static ApiVersionsResponse fromStruct(Struct struct, short version) { } } -public static ApiVersionsResponse apiVersionsResponse(int throttleTimeMs, byte maxMagic) { +public static ApiVersionsResponse apiVersionsResponse( +int throttleTimeMs, +byte maxMagic, +Features latestSupportedFeatures) { +return apiVersionsResponse( +throttleTimeMs, maxMagic, latestSupportedFeatures, Optional.empty(), Optional.empty()); +} + +public static ApiVersionsResponse apiVersionsResponse( +int throttleTimeMs, +byte maxMagic, +Features latestSupportedFeatures, +Features finalizedFeatures, +int finalizedFeaturesEpoch) { +return apiVersionsResponse( +throttleTimeMs, maxMagic, latestSupportedFeatures, Optional.of(finalizedFeatures), Optional.of(finalizedFeaturesEpoch)); +} + +private static ApiVersionsResponse apiVersionsResponse( +int throttleTimeMs, +byte maxMagic, +Features latestSupportedFeatures, +Optional> finalizedFeatures, +Optional finalizedFeaturesEpoch) { Review comment: Could we pass in `Optional` instead of two separate Optional? ## File path: core/src/test/scala/unit/kafka/server/SupportedFeaturesTest.scala ## @@ -0,0 +1,39 @@ +package kafka.server Review comment: missing license header ## File path: core/src/main/scala/kafka/cluster/Broker.scala ## @@ -34,14 +36,21 @@ object Broker { brokerId: Int, endpoints: util.List[Endpoint], interBrokerEndpoint: Endpoint) extends AuthorizerServerInfo + + def apply(id: Int, endPoints: Seq[EndPoint], rack: Option[String]): Broker = { +new Broker(id, endPoints, rack, emptySupportedFeatures) + } } /** * A Kafka broker. - * A broker has an id, a collection of end-points, an optional rack and a listener to security protocol map. - * Each end-point is (host, port, listenerName). + * + * @param id a broker id + * @param endPoints a collection of EndPoint. Each end-point is (host, port, listener name, security protocol). + * @param rackan optional rack + * @param featuresoptional supported features Review comment: The comment can be a bit misleading since features is not Optional. ## File path: core/src/main/scala/kafka/server/FinalizedFeatureChangeListener.scala ## @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, TimeUnit} + +import kafka.utils.{Logging, ShutdownableThread} +import kafka.zk.{FeatureZNode,FeatureZNodeStatus, KafkaZkClient, ZkVersion} +import kafka.zookeeper.ZNodeChangeHandler +import org.apache.kafka.common.internals.FatalExitError + +import scala.concurrent.TimeoutException +import scala.util.control.Exception.ignoring + +/** + * Listens to changes in the ZK feature node, via the ZK client. Whenever a change notification + * is received from ZK, the feature cache in FinalizedFeatureCache is asynchronously updated + * to the latest features read from ZK. The cache updates are serialized through a single + * notification processor thread. + * + * @param zkClient the Zookeeper client + */ +class FinalizedFeatureChangeListener(zkClient: KafkaZkClient) extends Logging { + + /** + * Helper class used to update the FinalizedFeatureCache. + * + * @param featureZkNodePath the path to the ZK feature node to be read + * @param maybeNotifyOnce an optional latch that can be used to notify the caller when an + *updateOrThrow() operation is over + */ + private class FeatureCacheUpdater(featureZkNodePath: String, maybeNotifyOnce: Option[CountDownLatch]) { + +def
[GitHub] [kafka] ableegoldman commented on pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it
ableegoldman commented on pull request #8803: URL: https://github.com/apache/kafka/pull/8803#issuecomment-639245176 Ready for review @abbccdda @guozhangwang This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #8803: KAFKA-10102: update ProcessorTopology instead of rebuilding it
ableegoldman opened a new pull request #8803: URL: https://github.com/apache/kafka/pull/8803 Rather than recreate the entire topology when we find new regex matched input partitions, we can just update the relevant pieces. Only the SourceNodes and ProcessorTopology seem to care about the input topics, and we can actually extract this information out of the SourceNode altogether by pulling it into the ProcessorTopology. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8799: KAFKA-8011: Fix flaky RegexSourceIntegrationTest
ableegoldman commented on a change in pull request #8799: URL: https://github.com/apache/kafka/pull/8799#discussion_r435588847 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java ## @@ -142,83 +142,89 @@ public void tearDown() throws IOException { @Test public void testRegexMatchesTopicsAWhenCreated() throws Exception { +try { +final Serde stringSerde = Serdes.String(); -final Serde stringSerde = Serdes.String(); - -final List expectedFirstAssignment = Collections.singletonList("TEST-TOPIC-1"); -// we compare lists of subscribed topics and hence requiring the order as well; this is guaranteed -// with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only add TEST-TOPIC-2 so the list is always -// in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 behavior ever changed it may become a flaky test -final List expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2"); +final List expectedFirstAssignment = Collections.singletonList("TEST-TOPIC-1"); +// we compare lists of subscribed topics and hence requiring the order as well; this is guaranteed +// with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only add TEST-TOPIC-2 so the list is always +// in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 behavior ever changed it may become a flaky test +final List expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2"); -CLUSTER.createTopic("TEST-TOPIC-1"); +CLUSTER.createTopic("TEST-TOPIC-1"); -final StreamsBuilder builder = new StreamsBuilder(); +final StreamsBuilder builder = new StreamsBuilder(); -final KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d")); +final KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d")); -pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); -final List assignedTopics = new CopyOnWriteArrayList<>(); -streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() { -@Override -public Consumer getConsumer(final Map config) { -return new KafkaConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { -@Override -public void subscribe(final Pattern topics, final ConsumerRebalanceListener listener) { -super.subscribe(topics, new TheConsumerRebalanceListener(assignedTopics, listener)); -} -}; +pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); +final List assignedTopics = new CopyOnWriteArrayList<>(); +streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() { +@Override +public Consumer getConsumer(final Map config) { +return new KafkaConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { +@Override +public void subscribe(final Pattern topics, final ConsumerRebalanceListener listener) { +super.subscribe(topics, new TheConsumerRebalanceListener(assignedTopics, listener)); +} +}; -} -}); +} +}); -streams.start(); -TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED); +streams.start(); +TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED); -CLUSTER.createTopic("TEST-TOPIC-2"); +CLUSTER.createTopic("TEST-TOPIC-2"); -TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED); +TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED); -streams.close(); -CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2"); +streams.close(); Review comment: Well, won't we end up deleting the topics before closing it if we never reach the first `streams.close` ? Or does it not really matter in that case since something has already gone wrong (just curious, I'm fine with it as-is btw) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10097) Avoid getting null map for task checkpoint
[ https://issues.apache.org/jira/browse/KAFKA-10097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126244#comment-17126244 ] feyman commented on KAFKA-10097: Cool, thanks a lot! [~bchen225242] > Avoid getting null map for task checkpoint > -- > > Key: KAFKA-10097 > URL: https://issues.apache.org/jira/browse/KAFKA-10097 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Boyang Chen >Priority: Major > > In StreamTask, we have the logic to generate a checkpoint offset map to be > materialized through StateManager#checkpoint. This map could be either empty > map or null, which the former case indicates to only pull down existing state > store checkpoint data, while the latter indicates no need to do a checkpoint > in the case such as we are suspending a task. > Having two similar special logics for checkpointing could lead to unexpected > bugs, also we should think about separating the empty checkpoint case vs > passed-in checkpoint case. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax merged pull request #8771: MINOR: Add explanation for disabling forwarding from value transformers
mjsax merged pull request #8771: URL: https://github.com/apache/kafka/pull/8771 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #8759: KAFKA-10066: TestOutputTopic should pass record headers into deserializers
mjsax merged pull request #8759: URL: https://github.com/apache/kafka/pull/8759 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9991) Flaky Test KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled
[ https://issues.apache.org/jira/browse/KAFKA-9991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126303#comment-17126303 ] Sophie Blee-Goldman commented on KAFKA-9991: Different test method: [https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2734/testReport/junit/org.apache.kafka.streams.integration/KTableSourceTopicRestartIntegrationTest/shouldRestoreAndProgressWhenTopicNotWrittenToDuringRestoration/] h3. Stacktrace java.lang.AssertionError: Condition not met within timeout 3. Table did not read all values at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368) at org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.assertNumberValuesRead(KTableSourceTopicRestartIntegrationTest.java:205) at org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicNotWrittenToDuringRestoration(KTableSourceTopicRestartIntegrationTest.java:186) > Flaky Test > KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled > - > > Key: KAFKA-9991 > URL: https://issues.apache.org/jira/browse/KAFKA-9991 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Major > Labels: flaky-test, unit-test > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6280/testReport/junit/org.apache.kafka.streams.integration/KTableSourceTopicRestartIntegrationTest/shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled/] > > h3. Stacktrace > java.lang.AssertionError: Condition not met within timeout 3. Table did > not read all values at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368) at > org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.assertNumberValuesRead(KTableSourceTopicRestartIntegrationTest.java:205) > at > org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled(KTableSourceTopicRestartIntegrationTest.java:159) > at > org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled(KTableSourceTopicRestartIntegrationTest.java:143) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10040) Make computing the PreferredReplicaImbalanceCount metric more efficient
[ https://issues.apache.org/jira/browse/KAFKA-10040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-10040. - Fix Version/s: 2.7.0 Resolution: Fixed > Make computing the PreferredReplicaImbalanceCount metric more efficient > --- > > Key: KAFKA-10040 > URL: https://issues.apache.org/jira/browse/KAFKA-10040 > Project: Kafka > Issue Type: Improvement >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > Fix For: 2.7.0 > > > At the moment, computing the PreferredReplicaImbalanceCount metric traverses > all the partitions in the cluster to find out the imbalance ones. This is > extremely costly in cluster with large number of partitions and this is done > after the processing of each event in the controller. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient
hachikuji merged pull request #8724: URL: https://github.com/apache/kafka/pull/8724 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable
hachikuji commented on pull request #8486: URL: https://github.com/apache/kafka/pull/8486#issuecomment-639211997 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8799: KAFKA-8011: Fix flaky RegexSourceIntegrationTest
mjsax commented on a change in pull request #8799: URL: https://github.com/apache/kafka/pull/8799#discussion_r435583650 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java ## @@ -142,83 +142,89 @@ public void tearDown() throws IOException { @Test public void testRegexMatchesTopicsAWhenCreated() throws Exception { +try { +final Serde stringSerde = Serdes.String(); -final Serde stringSerde = Serdes.String(); - -final List expectedFirstAssignment = Collections.singletonList("TEST-TOPIC-1"); -// we compare lists of subscribed topics and hence requiring the order as well; this is guaranteed -// with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only add TEST-TOPIC-2 so the list is always -// in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 behavior ever changed it may become a flaky test -final List expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2"); +final List expectedFirstAssignment = Collections.singletonList("TEST-TOPIC-1"); +// we compare lists of subscribed topics and hence requiring the order as well; this is guaranteed +// with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only add TEST-TOPIC-2 so the list is always +// in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 behavior ever changed it may become a flaky test +final List expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2"); -CLUSTER.createTopic("TEST-TOPIC-1"); +CLUSTER.createTopic("TEST-TOPIC-1"); -final StreamsBuilder builder = new StreamsBuilder(); +final StreamsBuilder builder = new StreamsBuilder(); -final KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d")); +final KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d")); -pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); -final List assignedTopics = new CopyOnWriteArrayList<>(); -streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() { -@Override -public Consumer getConsumer(final Map config) { -return new KafkaConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { -@Override -public void subscribe(final Pattern topics, final ConsumerRebalanceListener listener) { -super.subscribe(topics, new TheConsumerRebalanceListener(assignedTopics, listener)); -} -}; +pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); +final List assignedTopics = new CopyOnWriteArrayList<>(); +streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() { +@Override +public Consumer getConsumer(final Map config) { +return new KafkaConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { +@Override +public void subscribe(final Pattern topics, final ConsumerRebalanceListener listener) { +super.subscribe(topics, new TheConsumerRebalanceListener(assignedTopics, listener)); +} +}; -} -}); +} +}); -streams.start(); -TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED); +streams.start(); +TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED); -CLUSTER.createTopic("TEST-TOPIC-2"); +CLUSTER.createTopic("TEST-TOPIC-2"); -TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED); +TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED); -streams.close(); -CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2"); +streams.close(); Review comment: I don't think that is necessary -- there is an `@After` method that closed the client for us. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8775: KAFKA-10079: improve thread-level stickiness
ableegoldman commented on a change in pull request #8775: URL: https://github.com/apache/kafka/pull/8775#discussion_r435585609 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java ## @@ -302,14 +300,19 @@ public void computeTaskLags(final UUID uuid, final Map allTaskEndO * @return end offset sum - offset sum * Task.LATEST_OFFSET if this was previously an active running task on this client */ -long lagFor(final TaskId task) { -final Long totalLag = taskLagTotals.get(task); +public long lagFor(final TaskId task) { +final Long totalLag; +if (taskLagTotals.isEmpty()) { +// If we couldn't compute the task lags due to failure to fetch offsets, just return a flat constant +totalLag = 0L; Review comment: The value itself doesn't matter, just that it's constant across all tasks. But I'm guessing you meant, why not use the existing `UNKNOWN_OFFSET_SUM` sentinel, in which case the answer is probably just that I forgot about it. Anyway I did a slight additional refactoring beyond this, just fyi: instead of skipping the lag computation when we fail to fetch offsets, we now always initialize the lags and just pass in the `UNKNOWN_OFFSET_SUM` for all stateful tasks when the offset fetch fails. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs
ableegoldman commented on a change in pull request #8787: URL: https://github.com/apache/kafka/pull/8787#discussion_r435590436 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java ## @@ -562,23 +564,18 @@ private void restoreChangelog(final ChangelogMetadata changelogMetadata) { } private Map committedOffsetForChangelogs(final Set partitions) { -if (partitions.isEmpty()) -return Collections.emptyMap(); - Review comment: The diff is a bit misleading, this was also factored out into the new `ClientUtils#fetchCommittedOffsets` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8776: KAFKA-9441: Improve Kafka Streams task management
mjsax commented on pull request #8776: URL: https://github.com/apache/kafka/pull/8776#issuecomment-639154057 > I'm mildly concerned that we're using the state to implement idempotence of processing steps, but the states aren't 1:1 with the steps. For example, when we go from RUNNING to CLOSED, we transition through both prepareClose() and close(). But there's no CLOSE_PREPARED state, so there's no way to really differentiate that specifically prepareClose() has completed before, but not close(). Correct. Note that there will be more follow up PR, that will remove, eg, `prepareClose()` so this will be addressed soon. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8799: KAFKA-8011: Fix flaky RegexSourceIntegrationTest
mjsax commented on a change in pull request #8799: URL: https://github.com/apache/kafka/pull/8799#discussion_r435597397 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java ## @@ -142,83 +142,89 @@ public void tearDown() throws IOException { @Test public void testRegexMatchesTopicsAWhenCreated() throws Exception { +try { +final Serde stringSerde = Serdes.String(); -final Serde stringSerde = Serdes.String(); - -final List expectedFirstAssignment = Collections.singletonList("TEST-TOPIC-1"); -// we compare lists of subscribed topics and hence requiring the order as well; this is guaranteed -// with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only add TEST-TOPIC-2 so the list is always -// in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 behavior ever changed it may become a flaky test -final List expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2"); +final List expectedFirstAssignment = Collections.singletonList("TEST-TOPIC-1"); +// we compare lists of subscribed topics and hence requiring the order as well; this is guaranteed +// with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only add TEST-TOPIC-2 so the list is always +// in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 behavior ever changed it may become a flaky test +final List expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2"); -CLUSTER.createTopic("TEST-TOPIC-1"); +CLUSTER.createTopic("TEST-TOPIC-1"); -final StreamsBuilder builder = new StreamsBuilder(); +final StreamsBuilder builder = new StreamsBuilder(); -final KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d")); +final KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d")); -pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); -final List assignedTopics = new CopyOnWriteArrayList<>(); -streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() { -@Override -public Consumer getConsumer(final Map config) { -return new KafkaConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { -@Override -public void subscribe(final Pattern topics, final ConsumerRebalanceListener listener) { -super.subscribe(topics, new TheConsumerRebalanceListener(assignedTopics, listener)); -} -}; +pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); +final List assignedTopics = new CopyOnWriteArrayList<>(); +streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() { +@Override +public Consumer getConsumer(final Map config) { +return new KafkaConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { +@Override +public void subscribe(final Pattern topics, final ConsumerRebalanceListener listener) { +super.subscribe(topics, new TheConsumerRebalanceListener(assignedTopics, listener)); +} +}; -} -}); +} +}); -streams.start(); -TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED); +streams.start(); +TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED); -CLUSTER.createTopic("TEST-TOPIC-2"); +CLUSTER.createTopic("TEST-TOPIC-2"); -TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED); +TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED); -streams.close(); -CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2"); +streams.close(); Review comment: Yes, that is my reasoning. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8799: KAFKA-8011: Fix flaky RegexSourceIntegrationTest
mjsax commented on a change in pull request #8799: URL: https://github.com/apache/kafka/pull/8799#discussion_r435597397 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java ## @@ -142,83 +142,89 @@ public void tearDown() throws IOException { @Test public void testRegexMatchesTopicsAWhenCreated() throws Exception { +try { +final Serde stringSerde = Serdes.String(); -final Serde stringSerde = Serdes.String(); - -final List expectedFirstAssignment = Collections.singletonList("TEST-TOPIC-1"); -// we compare lists of subscribed topics and hence requiring the order as well; this is guaranteed -// with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only add TEST-TOPIC-2 so the list is always -// in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 behavior ever changed it may become a flaky test -final List expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2"); +final List expectedFirstAssignment = Collections.singletonList("TEST-TOPIC-1"); +// we compare lists of subscribed topics and hence requiring the order as well; this is guaranteed +// with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only add TEST-TOPIC-2 so the list is always +// in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 behavior ever changed it may become a flaky test +final List expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2"); -CLUSTER.createTopic("TEST-TOPIC-1"); +CLUSTER.createTopic("TEST-TOPIC-1"); -final StreamsBuilder builder = new StreamsBuilder(); +final StreamsBuilder builder = new StreamsBuilder(); -final KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d")); +final KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d")); -pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); -final List assignedTopics = new CopyOnWriteArrayList<>(); -streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() { -@Override -public Consumer getConsumer(final Map config) { -return new KafkaConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { -@Override -public void subscribe(final Pattern topics, final ConsumerRebalanceListener listener) { -super.subscribe(topics, new TheConsumerRebalanceListener(assignedTopics, listener)); -} -}; +pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); +final List assignedTopics = new CopyOnWriteArrayList<>(); +streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() { +@Override +public Consumer getConsumer(final Map config) { +return new KafkaConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { +@Override +public void subscribe(final Pattern topics, final ConsumerRebalanceListener listener) { +super.subscribe(topics, new TheConsumerRebalanceListener(assignedTopics, listener)); +} +}; -} -}); +} +}); -streams.start(); -TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED); +streams.start(); +TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED); -CLUSTER.createTopic("TEST-TOPIC-2"); +CLUSTER.createTopic("TEST-TOPIC-2"); -TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED); +TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED); -streams.close(); -CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2"); +streams.close(); Review comment: Yet, that is my reasoning. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8759: KAFKA-10066: TestOutputTopic should pass record headers into deserializers
mjsax commented on pull request #8759: URL: https://github.com/apache/kafka/pull/8759#issuecomment-639170115 Merged to `trunk` and cherry-picked to `2.6` and `2.5` branches. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #8775: KAFKA-10079: improve thread-level stickiness
ableegoldman commented on pull request #8775: URL: https://github.com/apache/kafka/pull/8775#issuecomment-639184354 Java8 failed with `KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicNotWrittenToDuringRestoration` Java14 failed with `KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled` I've seen both of these be flaky already (and frankly am a bit concerned about them...) but I'll see if I can reproduce this locally in case this PR is somehow making them worse This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vitojeng commented on pull request #8789: MINOR: Fix the javadoc broken links of streams
vitojeng commented on pull request #8789: URL: https://github.com/apache/kafka/pull/8789#issuecomment-639186748 Thanks @bbejeck . This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management
guozhangwang commented on a change in pull request #8776: URL: https://github.com/apache/kafka/pull/8776#discussion_r435636306 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -247,48 +251,75 @@ public void completeRestoration() { */ @Override public void prepareSuspend() { -if (state() == State.CREATED || state() == State.SUSPENDED) { -// do nothing -log.trace("Skip prepare suspending since state is {}", state()); -} else if (state() == State.RUNNING) { -closeTopology(true); +switch (state()) { Review comment: Yup, makes sense, just curious :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9991) Flaky Test KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled
[ https://issues.apache.org/jira/browse/KAFKA-9991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126302#comment-17126302 ] Sophie Blee-Goldman commented on KAFKA-9991: [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/898/testReport/junit/org.apache.kafka.streams.integration/KTableSourceTopicRestartIntegrationTest/shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled/] h3. Stacktrace java.lang.AssertionError: Condition not met within timeout 3. Table did not read all values at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368) at org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.assertNumberValuesRead(KTableSourceTopicRestartIntegrationTest.java:205) at org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled(KTableSourceTopicRestartIntegrationTest.java:159) at org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled(KTableSourceTopicRestartIntegrationTest.java:143) > Flaky Test > KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled > - > > Key: KAFKA-9991 > URL: https://issues.apache.org/jira/browse/KAFKA-9991 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Major > Labels: flaky-test, unit-test > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6280/testReport/junit/org.apache.kafka.streams.integration/KTableSourceTopicRestartIntegrationTest/shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled/] > > h3. Stacktrace > java.lang.AssertionError: Condition not met within timeout 3. Table did > not read all values at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368) at > org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.assertNumberValuesRead(KTableSourceTopicRestartIntegrationTest.java:205) > at > org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosEnabled(KTableSourceTopicRestartIntegrationTest.java:159) > at > org.apache.kafka.streams.integration.KTableSourceTopicRestartIntegrationTest.shouldRestoreAndProgressWhenTopicWrittenToDuringRestorationWithEosAlphaEnabled(KTableSourceTopicRestartIntegrationTest.java:143) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax commented on a change in pull request #8776: KAFKA-9441: Improve Kafka Streams task management
mjsax commented on a change in pull request #8776: URL: https://github.com/apache/kafka/pull/8776#discussion_r435588028 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -613,10 +604,14 @@ private long sumOfChangelogOffsets(final TaskId id, final Map changelogEntry : changelogOffsets.entrySet()) { final long offset = changelogEntry.getValue(); -offsetSum += offset; -if (offsetSum < 0) { -log.warn("Sum of changelog offsets for task {} overflowed, pinning to Long.MAX_VALUE", id); -return Long.MAX_VALUE; +if (offset == Task.LATEST_OFFSET) { Review comment: Sure, can add a comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs
ableegoldman commented on a change in pull request #8787: URL: https://github.com/apache/kafka/pull/8787#discussion_r435593477 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java ## @@ -562,23 +564,18 @@ private void restoreChangelog(final ChangelogMetadata changelogMetadata) { } private Map committedOffsetForChangelogs(final Set partitions) { -if (partitions.isEmpty()) -return Collections.emptyMap(); - final Map committedOffsets; try { -// those do not have a committed offset would default to 0 -committedOffsets = mainConsumer.committed(partitions).entrySet().stream() -.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() == null ? 0L : e.getValue().offset())); -} catch (final TimeoutException e) { -// if it timed out we just retry next time. -return Collections.emptyMap(); -} catch (final KafkaException e) { -throw new StreamsException(String.format("Failed to retrieve end offsets for %s", partitions), e); +committedOffsets = fetchCommittedOffsets(partitions, mainConsumer); +} catch (final StreamsException e) { +if (e.getCause() instanceof TimeoutException) { Review comment: I thought this might raise some eyebrows. I wanted to keep the ClientUtils methods consistent, and thought wrapping everything as a StreamsException would be cleaner. But maybe it makes more sense to throw the TimeoutException separately... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10102) Source node references not updated after rebuilding topology
[ https://issues.apache.org/jira/browse/KAFKA-10102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman reassigned KAFKA-10102: --- Assignee: (was: Sophie Blee-Goldman) > Source node references not updated after rebuilding topology > > > Key: KAFKA-10102 > URL: https://issues.apache.org/jira/browse/KAFKA-10102 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Priority: Blocker > Fix For: 2.6.0 > > > Luckily this bug was caught by > RegexSourceIntegrationTest#testRegexRecordsAreProcessedAfterReassignment – we > saw it fail with an NPE during SourceNode#deserializeKey, implying that the > key deserializer was null which in turns implies that the source node was > never initialized. > This can happen when a task is updated with new regex matched topic > partitions. In order to update the topology with the new input partitions, we > actually just create an entirely new topology from scratch including building > new source node objects. We then re/initialize this new topology once the > task is resumed. > The problem is that the task's RecordQueues save a reference to the > corresponding source node, and use this to pass polled records into the > topology. But the RecordQueues aren't updated with the newly built source > nodes and still point to the original nodes. > If the task had not completed restoration before being updated with new > partitions, it would never have initialized the original topology or source > nodes, resulting in an NPE when the RecordQueue passes a record to the old, > uninitialized source node. > This is the only specific known bug, but I haven't checked the entire code > base so it's possible there are other node references saved that might result > in bugs. We should try and avoid rebuilding an entirely new topology if at > all possible, and see if we can just update the input partitions only where > necessary -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-9923) Join window store duplicates can be compacted in changelog
[ https://issues.apache.org/jira/browse/KAFKA-9923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman reassigned KAFKA-9923: -- Assignee: Bruno Cadonna > Join window store duplicates can be compacted in changelog > --- > > Key: KAFKA-9923 > URL: https://issues.apache.org/jira/browse/KAFKA-9923 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Assignee: Bruno Cadonna >Priority: Blocker > Fix For: 2.6.0 > > > Stream-stream joins use the regular `WindowStore` implementation but with > `retainDuplicates` set to true. To allow for duplicates while using the same > unique-key underlying stores we just wrap the key with an incrementing > sequence number before inserting it. > This wrapping occurs at the innermost layer of the store hierarchy, which > means the duplicates must first pass through the changelogging layer. At this > point the keys are still identical. So, we end up sending the records to the > changelog without distinct keys and therefore may lose the older of the > duplicates during compaction. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10102) Source node references not updated after rebuilding topology
[ https://issues.apache.org/jira/browse/KAFKA-10102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman reassigned KAFKA-10102: --- Assignee: Sophie Blee-Goldman > Source node references not updated after rebuilding topology > > > Key: KAFKA-10102 > URL: https://issues.apache.org/jira/browse/KAFKA-10102 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Blocker > Fix For: 2.6.0 > > > Luckily this bug was caught by > RegexSourceIntegrationTest#testRegexRecordsAreProcessedAfterReassignment – we > saw it fail with an NPE during SourceNode#deserializeKey, implying that the > key deserializer was null which in turns implies that the source node was > never initialized. > This can happen when a task is updated with new regex matched topic > partitions. In order to update the topology with the new input partitions, we > actually just create an entirely new topology from scratch including building > new source node objects. We then re/initialize this new topology once the > task is resumed. > The problem is that the task's RecordQueues save a reference to the > corresponding source node, and use this to pass polled records into the > topology. But the RecordQueues aren't updated with the newly built source > nodes and still point to the original nodes. > If the task had not completed restoration before being updated with new > partitions, it would never have initialized the original topology or source > nodes, resulting in an NPE when the RecordQueue passes a record to the old, > uninitialized source node. > This is the only specific known bug, but I haven't checked the entire code > base so it's possible there are other node references saved that might result > in bugs. We should try and avoid rebuilding an entirely new topology if at > all possible, and see if we can just update the input partitions only where > necessary -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10102) Source node references not updated after rebuilding topology
[ https://issues.apache.org/jira/browse/KAFKA-10102?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sophie Blee-Goldman reassigned KAFKA-10102: --- Assignee: Sophie Blee-Goldman > Source node references not updated after rebuilding topology > > > Key: KAFKA-10102 > URL: https://issues.apache.org/jira/browse/KAFKA-10102 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Sophie Blee-Goldman >Assignee: Sophie Blee-Goldman >Priority: Blocker > Fix For: 2.6.0 > > > Luckily this bug was caught by > RegexSourceIntegrationTest#testRegexRecordsAreProcessedAfterReassignment – we > saw it fail with an NPE during SourceNode#deserializeKey, implying that the > key deserializer was null which in turns implies that the source node was > never initialized. > This can happen when a task is updated with new regex matched topic > partitions. In order to update the topology with the new input partitions, we > actually just create an entirely new topology from scratch including building > new source node objects. We then re/initialize this new topology once the > task is resumed. > The problem is that the task's RecordQueues save a reference to the > corresponding source node, and use this to pass polled records into the > topology. But the RecordQueues aren't updated with the newly built source > nodes and still point to the original nodes. > If the task had not completed restoration before being updated with new > partitions, it would never have initialized the original topology or source > nodes, resulting in an NPE when the RecordQueue passes a record to the old, > uninitialized source node. > This is the only specific known bug, but I haven't checked the entire code > base so it's possible there are other node references saved that might result > in bugs. We should try and avoid rebuilding an entirely new topology if at > all possible, and see if we can just update the input partitions only where > necessary -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient
hachikuji commented on pull request #8724: URL: https://github.com/apache/kafka/pull/8724#issuecomment-639189300 I will go ahead and merge this. I build locally with JDK11 and Scala 2.13. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs
ableegoldman commented on pull request #8787: URL: https://github.com/apache/kafka/pull/8787#issuecomment-639205947 Hey @vvcephei, I addressed your comments and added tests. Let me know if there's any test coverage that still seems missing This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs
ableegoldman commented on a change in pull request #8787: URL: https://github.com/apache/kafka/pull/8787#discussion_r435647575 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -763,18 +778,36 @@ private boolean populateClientStatesMap(final Map clientState .flatMap(Collection::stream) .collect(Collectors.toList()); -final Collection allPreexistingChangelogPartitions = new ArrayList<>(allChangelogPartitions); -allPreexistingChangelogPartitions.removeIf(partition -> newlyCreatedChangelogs.contains(partition.topic())); +final Set preexistingChangelogPartitions = new HashSet<>(); +final Set preexistingSourceChangelogPartitions = new HashSet<>(); +final Set newlyCreatedChangelogPartitions = new HashSet<>(); +for (final TopicPartition changelog : allChangelogPartitions) { +if (newlyCreatedChangelogs.contains(changelog.topic())) { +newlyCreatedChangelogPartitions.add(changelog); +} else if (optimizedSourceChangelogs.contains(changelog.topic())) { +preexistingSourceChangelogPartitions.add(changelog); +} else { +preexistingChangelogPartitions.add(changelog); +} +} + +// Make the listOffsets request first so it can fetch the offsets for non-source changelogs +// asynchronously while we use the blocking Consumer#committed call to fetch source-changelog offsets +final KafkaFuture> endOffsetsFuture = +fetchEndOffsetsFuture(preexistingChangelogPartitions, adminClient); -final Collection allNewlyCreatedChangelogPartitions = new ArrayList<>(allChangelogPartitions); - allNewlyCreatedChangelogPartitions.removeAll(allPreexistingChangelogPartitions); +final Map sourceChangelogEndOffsets = +fetchCommittedOffsets(preexistingSourceChangelogPartitions, taskManager.mainConsumer()); -final Map endOffsets = -fetchEndOffsets(allPreexistingChangelogPartitions, adminClient); +final Map endOffsets = ClientUtils.getEndOffsets(endOffsetsFuture); -allTaskEndOffsetSums = computeEndOffsetSumsByTask(endOffsets, changelogsByStatefulTask, allNewlyCreatedChangelogPartitions); +allTaskEndOffsetSums = computeEndOffsetSumsByTask( +changelogsByStatefulTask, +endOffsets, +sourceChangelogEndOffsets, +newlyCreatedChangelogPartitions); fetchEndOffsetsSuccessful = true; -} catch (final StreamsException e) { +} catch (final StreamsException | TimeoutException e) { Review comment: @vvcephei I've been wondering if maybe we should _only_ catch the TimeoutException, and interpret a StreamsException as fatal (like IllegalStateException for example). This is how we were using `Consumer#committed` in the StoreChangelogReader, and AFAICT that only throws KafkaException on "unrecoverable errors" (quoted from javadocs) But I can't tell whether the Admin's `listOffsets` might throw on transient errors, so I'm leaning towards catching both just to be safe. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8799: KAFKA-8011: Fix flaky RegexSourceIntegrationTest
ableegoldman commented on a change in pull request #8799: URL: https://github.com/apache/kafka/pull/8799#discussion_r435584438 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java ## @@ -142,83 +142,89 @@ public void tearDown() throws IOException { @Test public void testRegexMatchesTopicsAWhenCreated() throws Exception { +try { +final Serde stringSerde = Serdes.String(); -final Serde stringSerde = Serdes.String(); - -final List expectedFirstAssignment = Collections.singletonList("TEST-TOPIC-1"); -// we compare lists of subscribed topics and hence requiring the order as well; this is guaranteed -// with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only add TEST-TOPIC-2 so the list is always -// in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 behavior ever changed it may become a flaky test -final List expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2"); +final List expectedFirstAssignment = Collections.singletonList("TEST-TOPIC-1"); +// we compare lists of subscribed topics and hence requiring the order as well; this is guaranteed +// with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only add TEST-TOPIC-2 so the list is always +// in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 behavior ever changed it may become a flaky test +final List expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2"); -CLUSTER.createTopic("TEST-TOPIC-1"); +CLUSTER.createTopic("TEST-TOPIC-1"); -final StreamsBuilder builder = new StreamsBuilder(); +final StreamsBuilder builder = new StreamsBuilder(); -final KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d")); +final KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d")); -pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); -final List assignedTopics = new CopyOnWriteArrayList<>(); -streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() { -@Override -public Consumer getConsumer(final Map config) { -return new KafkaConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { -@Override -public void subscribe(final Pattern topics, final ConsumerRebalanceListener listener) { -super.subscribe(topics, new TheConsumerRebalanceListener(assignedTopics, listener)); -} -}; +pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); +final List assignedTopics = new CopyOnWriteArrayList<>(); +streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() { +@Override +public Consumer getConsumer(final Map config) { +return new KafkaConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { +@Override +public void subscribe(final Pattern topics, final ConsumerRebalanceListener listener) { +super.subscribe(topics, new TheConsumerRebalanceListener(assignedTopics, listener)); +} +}; -} -}); +} +}); -streams.start(); -TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED); +streams.start(); +TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED); -CLUSTER.createTopic("TEST-TOPIC-2"); +CLUSTER.createTopic("TEST-TOPIC-2"); -TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED); +TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED); -streams.close(); -CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2"); +streams.close(); Review comment: Then why close it here as well? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8799: KAFKA-8011: Fix flaky RegexSourceIntegrationTest
mjsax commented on a change in pull request #8799: URL: https://github.com/apache/kafka/pull/8799#discussion_r435587017 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java ## @@ -142,83 +142,89 @@ public void tearDown() throws IOException { @Test public void testRegexMatchesTopicsAWhenCreated() throws Exception { +try { +final Serde stringSerde = Serdes.String(); -final Serde stringSerde = Serdes.String(); - -final List expectedFirstAssignment = Collections.singletonList("TEST-TOPIC-1"); -// we compare lists of subscribed topics and hence requiring the order as well; this is guaranteed -// with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only add TEST-TOPIC-2 so the list is always -// in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 behavior ever changed it may become a flaky test -final List expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2"); +final List expectedFirstAssignment = Collections.singletonList("TEST-TOPIC-1"); +// we compare lists of subscribed topics and hence requiring the order as well; this is guaranteed +// with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only add TEST-TOPIC-2 so the list is always +// in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429 behavior ever changed it may become a flaky test +final List expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2"); -CLUSTER.createTopic("TEST-TOPIC-1"); +CLUSTER.createTopic("TEST-TOPIC-1"); -final StreamsBuilder builder = new StreamsBuilder(); +final StreamsBuilder builder = new StreamsBuilder(); -final KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d")); +final KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d")); -pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); -final List assignedTopics = new CopyOnWriteArrayList<>(); -streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() { -@Override -public Consumer getConsumer(final Map config) { -return new KafkaConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { -@Override -public void subscribe(final Pattern topics, final ConsumerRebalanceListener listener) { -super.subscribe(topics, new TheConsumerRebalanceListener(assignedTopics, listener)); -} -}; +pattern1Stream.to(outputTopic, Produced.with(stringSerde, stringSerde)); +final List assignedTopics = new CopyOnWriteArrayList<>(); +streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() { +@Override +public Consumer getConsumer(final Map config) { +return new KafkaConsumer(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) { +@Override +public void subscribe(final Pattern topics, final ConsumerRebalanceListener listener) { +super.subscribe(topics, new TheConsumerRebalanceListener(assignedTopics, listener)); +} +}; -} -}); +} +}); -streams.start(); -TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED); +streams.start(); +TestUtils.waitForCondition(() -> assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED); -CLUSTER.createTopic("TEST-TOPIC-2"); +CLUSTER.createTopic("TEST-TOPIC-2"); -TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED); +TestUtils.waitForCondition(() -> assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED); -streams.close(); -CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2"); +streams.close(); Review comment: I think it's better to first close it before we delete the topics. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9203) kafka-client 2.3.1 fails to consume lz4 compressed topic
[ https://issues.apache.org/jira/browse/KAFKA-9203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126293#comment-17126293 ] Ismael Juma commented on KAFKA-9203: Hi [~manthanit]. Thanks for providing this information. I was personally quite puzzled how the original change caused problems. But I reverted it out of caution since it was a clean-up and I wasn't sure if there was a bug in the underlying library. [~dwatzke] Is there any chance you were including multiple versions of lz4 in the classpath by accident? > kafka-client 2.3.1 fails to consume lz4 compressed topic > > > Key: KAFKA-9203 > URL: https://issues.apache.org/jira/browse/KAFKA-9203 > Project: Kafka > Issue Type: Bug > Components: compression, consumer >Affects Versions: 2.3.0, 2.3.1 >Reporter: David Watzke >Assignee: Ismael Juma >Priority: Blocker > Fix For: 2.4.0, 2.3.2 > > Attachments: kafka-clients-2.3.2-SNAPSHOT.jar > > > I run kafka cluster 2.1.1 > when I upgraded the consumer app to use kafka-client 2.3.0 (or 2.3.1) instead > of 2.2.0, I immediately started getting the following exceptions in a loop > when consuming a topic with LZ4-compressed messages: > {noformat} > 2019-11-18 11:59:16.888 ERROR [afka-consumer-thread] > com.avast.utils2.kafka.consumer.ConsumerThread: Unexpected exception occurred > while polling and processing messages: org.apache.kafka.common.KafkaExce > ption: Received exception when fetching the next record from > FILEREP_LONER_REQUEST-4. If needed, please seek past the record to continue > consumption. > org.apache.kafka.common.KafkaException: Received exception when fetching the > next record from FILEREP_LONER_REQUEST-4. If needed, please seek past the > record to continue consumption. > at > org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1473) > > at > org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332) > > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645) > > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606) > > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1263) > > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) > at > com.avast.utils2.kafka.consumer.ConsumerThread.pollAndProcessMessagesFromKafka(ConsumerThread.java:180) > > at > com.avast.utils2.kafka.consumer.ConsumerThread.run(ConsumerThread.java:149) > at > com.avast.filerep.saver.RequestSaver$.$anonfun$main$8(RequestSaver.scala:28) > at > com.avast.filerep.saver.RequestSaver$.$anonfun$main$8$adapted(RequestSaver.scala:19) > > at > resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88) > > at > scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) > at scala.util.control.Exception$Catch.apply(Exception.scala:228) > at scala.util.control.Exception$Catch.either(Exception.scala:252) > at > resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88) > at > resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26) > at > resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26) > at > resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50) > at > resource.ManagedResourceOperations.acquireAndGet(ManagedResourceOperations.scala:25) > > at > resource.ManagedResourceOperations.acquireAndGet$(ManagedResourceOperations.scala:25) > > at > resource.AbstractManagedResource.acquireAndGet(AbstractManagedResource.scala:50) > > at > resource.ManagedResourceOperations.foreach(ManagedResourceOperations.scala:53) > > at > resource.ManagedResourceOperations.foreach$(ManagedResourceOperations.scala:53) > > at > resource.AbstractManagedResource.foreach(AbstractManagedResource.scala:50) > at > com.avast.filerep.saver.RequestSaver$.$anonfun$main$6(RequestSaver.scala:19) > at > com.avast.filerep.saver.RequestSaver$.$anonfun$main$6$adapted(RequestSaver.scala:18) > > at > resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88) > > at > scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) > at scala.util.control.Exception$Catch.apply(Exception.scala:228) > at
[GitHub] [kafka] guozhangwang commented on pull request #8776: KAFKA-9441: Improve Kafka Streams task management
guozhangwang commented on pull request #8776: URL: https://github.com/apache/kafka/pull/8776#issuecomment-639196728 test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10105) Regression in group coordinator dealing with flaky clients joining while leaving
William Reynolds created KAFKA-10105: Summary: Regression in group coordinator dealing with flaky clients joining while leaving Key: KAFKA-10105 URL: https://issues.apache.org/jira/browse/KAFKA-10105 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.4.1 Environment: Kafka 1.1.0 on jre 8 on debian 9 in docker Kafka 2.4.1 on jre 11 on debian 9 in docker Reporter: William Reynolds Since upgrade of a cluster from 1.1.0 to 2.4.1 the broker no longer deals correctly with a consumer sending a join after a leave correctly. What happens no is that if a consumer sends a leaving then follows up by trying to send a join again as it is shutting down the group coordinator adds the leaving member to the group but never seems to heartbeat that member. Since the consumer is then gone when it joins again after starting it is added as a new member but the zombie member is there and is included in the partition assignment which means that those partitions never get consumed from. What can also happen is that one of the zombies gets group leader so rebalance gets stuck forever and the group is entirely blocked. I have not been able to track down where this got introduced between 1.1.0 and 2.4.1 but I will look further into this. Unfortunately the logs are essentially silent about the zombie mebers and I only had INFO level logging on during the issue and by stopping all the consumers in the group and restarting the broker coordinating that group we could get back to a working state. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] abbccdda commented on pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable
abbccdda commented on pull request #8486: URL: https://github.com/apache/kafka/pull/8486#issuecomment-639265937 retest please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ning2008wisc commented on a change in pull request #7577: KAFKA-9076: support consumer offset sync across clusters in MM 2.0
ning2008wisc commented on a change in pull request #7577: URL: https://github.com/apache/kafka/pull/7577#discussion_r435705249 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -316,6 +316,69 @@ public void testReplication() throws InterruptedException { backup.kafka().consume(NUM_RECORDS_PRODUCED, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count()); } + +@Test +public void testOneWayReplicationWithAutorOffsetSync1() throws InterruptedException { + +// create consumers before starting the connectors so we don't need to wait for discovery +Consumer consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( +"group.id", "consumer-group-1"), "test-topic-1"); +consumer1.poll(Duration.ofMillis(500)); +consumer1.commitSync(); +consumer1.close(); + +// enable automated consumer group offset sync +mm2Props.put("sync.group.offsets.enabled", "true"); +mm2Props.put("sync.group.offsets.interval.seconds", "1"); +// one way replication from primary to backup +mm2Props.put("backup->primary.enabled", "false"); + +mm2Config = new MirrorMakerConfig(mm2Props); + +waitUntilMirrorMakerIsRunning(backup, mm2Config, "primary", "backup"); + +// sleep 5 seconds to ensure the automated group offset sync is complete +time.sleep(5000); + +// create a consumer at backup cluster with same consumer group Id to consume 1 topic +Consumer consumer = backup.kafka().createConsumerAndSubscribeTo( +Collections.singletonMap("group.id", "consumer-group-1"), "primary.test-topic-1"); +ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); +// the size of consumer record should be zero, because the offsets of the same consumer group +// have been automatically synchronized from primary to backup by the background job, so no +// more records to consume from the replicated topic by the same consumer group at backup cluster +assertTrue("consumer record size is not zero", records.count() == 0); + +// now create a new topic in primary cluster +primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS); +backup.kafka().createTopic("primary.test-topic-2", 1); +// produce some records to the new topic in primary cluster +for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) { +primary.kafka().produce("test-topic-2", i % NUM_PARTITIONS, "key", "message-1-" + i); +} + +// create a consumer at primary cluster to consume the new topic +consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( +"group.id", "consumer-group-1"), "test-topic-2"); +consumer1.poll(Duration.ofMillis(500)); +consumer1.commitSync(); +consumer1.close(); + +// sleep 5 seconds to ensure the automated group offset sync is complete +time.sleep(5000); + +// create a consumer at backup cluster with same consumer group Id to consume old and new topic +consumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( +"group.id", "consumer-group-1"), "primary.test-topic-1", "primary.test-topic-2"); + +records = consumer.poll(Duration.ofMillis(500)); +// similar reasoning as above, no more records to consume by the same consumer group at backup cluster +assertTrue("consumer record size is not zero", records.count() == 0); Review comment: updated ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java ## @@ -207,8 +185,30 @@ public void close() { backup.stop(); } + + @Test public void testReplication() throws InterruptedException { + +// create consumers before starting the connectors so we don't need to wait for discovery +Consumer consumer3 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap( Review comment: updated to `consumer1` and `consumer2` ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java ## @@ -64,4 +70,58 @@ public void testCheckpoint() { assertEquals(13, checkpoint2.downstreamOffset()); assertEquals(234L, sourceRecord2.timestamp().longValue()); } + +@Test +public void testSyncOffset() { +Map> idleConsumerGroupsOffset = new HashMap<>(); +Map> checkpointsPerConsumerGroup = new HashMap<>(); + +String consumer1 = "consumer1"; +String consumer2 = "consumer2"; + +String topic1 = "topic1"; +String topic2 = "topic2"; + +// 'c1t1' denotes consumer offsets of all partitions of topic1 for consumer1 +
[jira] [Commented] (KAFKA-10090) Misleading warnings: The configuration was supplied but isn't a known config
[ https://issues.apache.org/jira/browse/KAFKA-10090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17126412#comment-17126412 ] Chia-Ping Tsai commented on KAFKA-10090: It seems to me that the PR should include following changes. 1. [ChannelBuilders#channelBuilderConfigs|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java#L162] should not return copy of configs. 2. verify the unused keys (see https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/common/network/ChannelBuildersTest.java#L71) [~rwruck] Feel free to file a PR to resolve this issue. The PR is able to bring more discussion :) > Misleading warnings: The configuration was supplied but isn't a known config > > > Key: KAFKA-10090 > URL: https://issues.apache.org/jira/browse/KAFKA-10090 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.5.0 >Reporter: Robert Wruck >Priority: Major > > In our setup (using Spring cloud stream Kafka binder), we see log messages > like: > > {{The configuration 'ssl.keystore.password' was supplied but isn't a known > config}} > > logged by org.apache.kafka.clients.admin.AdminClientConfig. The Kafka binder > actually uses SSL and security.protocol is set to SSL. > Looking through the code, a few things seem odd: > * The log message says "isn't a known config" but that's not true. It is > *known*, i.e. defined in ConfigDef, but not *used*. > * The method for detecting whether a config is actually *used* is not > complete. ChannelBuilders.channelBuilderConfigs() for example extracts the > configs to use for the created channel builder using *new > HashMap(config.values())* thus *get()* won't mark a config as used anymore. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10106) measure and log time taken to handle LeaderAndIsr request
NIKHIL created KAFKA-10106: -- Summary: measure and log time taken to handle LeaderAndIsr request Key: KAFKA-10106 URL: https://issues.apache.org/jira/browse/KAFKA-10106 Project: Kafka Issue Type: Improvement Reporter: NIKHIL ReplicaManager!becomeLeaderOrFollower handles the LeaderAndIsr request, StateChangeLogger logs when this request is handled, however it can be useful to log when this calls ends and record the time taken, can help operationally. Proposal is to ReplicaManager!becomeLeaderOrFollower start measuring the time before the `replicaStateChangeLock` is acquired and log before the response is returned. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mimaison commented on pull request #8311: KAFKA-9434: automated protocol for alterReplicaLogDirs
mimaison commented on pull request #8311: URL: https://github.com/apache/kafka/pull/8311#issuecomment-638836545 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #8311: KAFKA-9434: automated protocol for alterReplicaLogDirs
mimaison commented on a change in pull request #8311: URL: https://github.com/apache/kafka/pull/8311#discussion_r435243287 ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -570,6 +592,27 @@ public void testDeleteTopics() throws Exception { } } +@Test +public void testDeleteTopicsPartialResponse() throws Exception { +try (AdminClientUnitTestEnv env = mockClientEnv()) { +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + +env.kafkaClient().prepareResponse(body -> body instanceof DeleteTopicsRequest, +prepareDeleteTopicsResponse("myTopic", Errors.NONE)); +Map> values = env.adminClient().deleteTopics(asList("myTopic", "myOtherTopic"), +new DeleteTopicsOptions()).values(); +values.get("myTopic").get(); + +try { Review comment: We can use `TestUtils.assertFutureThrows()` here too ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -3382,6 +3425,90 @@ public void testAlterClientQuotas() throws Exception { } } +@Test +public void testAlterReplicaLogDirsSuccess() throws Exception { +try (AdminClientUnitTestEnv env = mockClientEnv()) { +createAlterLogDirsResponse(env, env.cluster().nodeById(0), Errors.NONE, 0); +createAlterLogDirsResponse(env, env.cluster().nodeById(1), Errors.NONE, 0); + +TopicPartitionReplica tpr0 = new TopicPartitionReplica("topic", 0, 0); +TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 0, 1); + +Map logDirs = new HashMap<>(); +logDirs.put(tpr0, "/data0"); +logDirs.put(tpr1, "/data1"); +AlterReplicaLogDirsResult result = env.adminClient().alterReplicaLogDirs(logDirs); +assertNull(result.values().get(tpr0).get()); +assertNull(result.values().get(tpr1).get()); +} +} + +@Test +public void testAlterReplicaLogDirsLogDirNotFound() throws Exception { +try (AdminClientUnitTestEnv env = mockClientEnv()) { +createAlterLogDirsResponse(env, env.cluster().nodeById(0), Errors.NONE, 0); +createAlterLogDirsResponse(env, env.cluster().nodeById(1), Errors.LOG_DIR_NOT_FOUND, 0); + +TopicPartitionReplica tpr0 = new TopicPartitionReplica("topic", 0, 0); +TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 0, 1); + +Map logDirs = new HashMap<>(); +logDirs.put(tpr0, "/data0"); +logDirs.put(tpr1, "/data1"); +AlterReplicaLogDirsResult result = env.adminClient().alterReplicaLogDirs(logDirs); +assertNull(result.values().get(tpr0).get()); +TestUtils.assertFutureError(result.values().get(tpr1), LogDirNotFoundException.class); +} +} + +@Test +public void testAlterReplicaLogDirsUnrequested() throws Exception { +try (AdminClientUnitTestEnv env = mockClientEnv()) { +createAlterLogDirsResponse(env, env.cluster().nodeById(0), Errors.NONE, 1, 2); + +TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 1, 0); + +Map logDirs = new HashMap<>(); +logDirs.put(tpr1, "/data1"); +AlterReplicaLogDirsResult result = env.adminClient().alterReplicaLogDirs(logDirs); +assertNull(result.values().get(tpr1).get()); +} +} + +@Test +public void testAlterReplicaLogDirsPartialResponse() throws Exception { +try (AdminClientUnitTestEnv env = mockClientEnv()) { +createAlterLogDirsResponse(env, env.cluster().nodeById(0), Errors.NONE, 1); + +TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 1, 0); +TopicPartitionReplica tpr2 = new TopicPartitionReplica("topic", 2, 0); + +Map logDirs = new HashMap<>(); +logDirs.put(tpr1, "/data1"); +logDirs.put(tpr2, "/data1"); +AlterReplicaLogDirsResult result = env.adminClient().alterReplicaLogDirs(logDirs); +assertNull(result.values().get(tpr1).get()); +try { Review comment: we can use `TestUtils.assertFutureThrows()` here too This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #8644: KAFKA-9313: Set `use_all_dns_ips` as the new default for `client.dns.lookup` (KIP-602)
ijuma merged pull request #8644: URL: https://github.com/apache/kafka/pull/8644 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #8644: KAFKA-9313: Set `use_all_dns_ips` as the new default for `client.dns.lookup` (KIP-602)
ijuma commented on pull request #8644: URL: https://github.com/apache/kafka/pull/8644#issuecomment-638845776 Thanks for the contribution! Merged to trunk and 2.6 branches. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #8311: KAFKA-9434: automated protocol for alterReplicaLogDirs
tombentley commented on pull request #8311: URL: https://github.com/apache/kafka/pull/8311#issuecomment-638846166 @mimaison done. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10102) Source node references not updated after rebuilding topology
Sophie Blee-Goldman created KAFKA-10102: --- Summary: Source node references not updated after rebuilding topology Key: KAFKA-10102 URL: https://issues.apache.org/jira/browse/KAFKA-10102 Project: Kafka Issue Type: Bug Components: streams Reporter: Sophie Blee-Goldman Fix For: 2.6.0 Luckily this bug was caught by RegexSourceIntegrationTest#testRegexRecordsAreProcessedAfterReassignment – we saw it fail with an NPE during SourceNode#deserializeKey, implying that the key deserializer was null which in turns implies that the source node was never initialized. This can happen when a task is updated with new regex matched topic partitions. In order to update the topology with the new input partitions, we actually just create an entirely new topology from scratch including building new source node objects. We then re/initialize this new topology once the task is resumed. The problem is that the task's RecordQueues save a reference to the corresponding source node, and use this to pass polled records into the topology. But the RecordQueues aren't updated with the newly built source nodes and still point to the original nodes. If the task had not completed restoration before being updated with new partitions, it would never have initialized the original topology or source nodes, resulting in an NPE when the RecordQueue passes a record to the old, uninitialized source node. This is the only specific known bug, but I haven't checked the entire code base so it's possible there are other node references saved that might result in bugs. We should try and avoid rebuilding an entirely new topology if at all possible, and see if we can just update the input partitions only where necessary -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on pull request #8644: KAFKA-9313: Set `use_all_dns_ips` as the new default for `client.dns.lookup` (KIP-602)
ijuma commented on pull request #8644: URL: https://github.com/apache/kafka/pull/8644#issuecomment-638622720 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] highluck commented on pull request #8107: MINOR: Remove Diamond and code code Alignment
highluck commented on pull request #8107: URL: https://github.com/apache/kafka/pull/8107#issuecomment-638630457 @vvcephei code update! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #8755: KAFKA-10069: Correctly remove user-defined "predicate" and "negate" configs from transformation properties
tombentley commented on pull request #8755: URL: https://github.com/apache/kafka/pull/8755#issuecomment-638661645 Yes, thanks, that was a goot spot @chia7712! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org