[GitHub] [kafka] lkokhreidze commented on pull request #10802: KAFKA-6718 / Update SubscriptionInfoData with clientTags
lkokhreidze commented on pull request #10802: URL: https://github.com/apache/kafka/pull/10802#issuecomment-1063755741 @cadonna done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning
ableegoldman commented on a change in pull request #11868: URL: https://github.com/apache/kafka/pull/11868#discussion_r823421770 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/RemoveNamedTopologyResult.java ## @@ -52,34 +60,54 @@ public RemoveNamedTopologyResult(final KafkaFuture removeTopologyFuture) { * successfully completed their corresponding {@link KafkaFuture}. */ public final KafkaFuture all() { -final KafkaFutureImpl result = new KafkaFutureImpl<>(); +if (resetOffsetsFuture == null) { +return removeTopologyFuture; +} else { +return resetOffsetsFuture; Review comment: Basically we now have the caller thread perform the offset reset and block on it when it goes to call `get()` on the future returned by `RemoveNamedTopologyResult#all` (or `#resetOffsetsFuture`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning
ableegoldman commented on a change in pull request #11868: URL: https://github.com/apache/kafka/pull/11868#discussion_r823421009 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ## @@ -264,48 +267,46 @@ private boolean maybeCompleteFutureIfStillInCREATED(final KafkaFutureImpl } } -private RemoveNamedTopologyResult resetOffsets(final KafkaFutureImpl removeTopologyFuture, - final Set partitionsToReset) { -if (!partitionsToReset.isEmpty()) { -removeTopologyFuture.whenComplete((v, throwable) -> { -if (throwable != null) { -removeTopologyFuture.completeExceptionally(throwable); -} -DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null; -while (deleteOffsetsResult == null) { -try { -deleteOffsetsResult = adminClient.deleteConsumerGroupOffsets( - applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), partitionsToReset); -deleteOffsetsResult.all().get(); -} catch (final InterruptedException ex) { -ex.printStackTrace(); -break; -} catch (final ExecutionException ex) { -if (ex.getCause() != null && -ex.getCause() instanceof GroupSubscribedToTopicException && -ex.getCause() -.getMessage() -.equals("Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it.")) { -ex.printStackTrace(); -} else if (ex.getCause() != null && -ex.getCause() instanceof GroupIdNotFoundException) { -log.debug("The offsets have been reset by another client or the group has been deleted, no need to retry further."); -break; -} else { -removeTopologyFuture.completeExceptionally(ex); -} -deleteOffsetsResult = null; -} -try { -Thread.sleep(100); -} catch (final InterruptedException ex) { -ex.printStackTrace(); +private void resetOffsets(final Set partitionsToReset) throws StreamsException { Review comment: Sorry for the large diff -- it's mainly due to spacing from having moved the `1!partitionsToReset.isEmpty()`, plus one small stylistic change to use a `while true` loop with `break`s because following the null status of the `deleteOffsetsResult` was a bit confusing. The real change though is that this method now just performs the offset resets directly, rather than directing whoever completes the `removeNamedTopology` future to perform the offset reset (which is non-trivial and thus not appropriate for the StreamThreads to do). We now invoke this directly when the user calls `get()` on the future returned from the RemoveNamedTopologyResult. This is the main change since being approved @wcarlson5 @guozhangwang There's also the -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #10802: KAFKA-6718 / Update SubscriptionInfoData with clientTags
cadonna commented on pull request #10802: URL: https://github.com/apache/kafka/pull/10802#issuecomment-1063745684 @lkokhreidze Could you please rebase this PR on latest trunk? Some system tests fail probably due to the absence of a fix on the PR branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning
ableegoldman commented on a change in pull request #11868: URL: https://github.com/apache/kafka/pull/11868#discussion_r823417406 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ## @@ -239,12 +238,16 @@ public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemo final boolean skipResetForUnstartedApplication = maybeCompleteFutureIfStillInCREATED(removeTopologyFuture, "removing topology " + topologyToRemove); -if (resetOffsets && !skipResetForUnstartedApplication) { +if (resetOffsets && !skipResetForUnstartedApplication && !partitionsToReset.isEmpty()) { Review comment: Moved the `!partitionsToReset.isEmpty()` check here to make sure we don't log the line about resetting offsets if we don't actually have any offsets to reset -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 commented on pull request #11819: MINOR: Optimize the generateFieldToString method of MessageDataGenerator
RivenSun2 commented on pull request #11819: URL: https://github.com/apache/kafka/pull/11819#issuecomment-1063741527 Hi @dajac @cmccabe Could you help to review the PR ? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13722) Update internal interfaces that use ProcessorContext to use StateStoreContext instead
[ https://issues.apache.org/jira/browse/KAFKA-13722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-13722: Component/s: streams > Update internal interfaces that use ProcessorContext to use StateStoreContext > instead > - > > Key: KAFKA-13722 > URL: https://issues.apache.org/jira/browse/KAFKA-13722 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > > This is a remainder that when we remove the deprecated public APIs that uses > the ProcessorContext, like `StateStore.init`, we should also consider > updating the internal interfaces with the ProcessorContext as well. That > includes: > 1. Segments and related util classes which use ProcessorContext. > 2. For state stores that leverage on ProcessorContext.getXXXTime, their logic > should be moved out of the state store impl but to the processor node level > that calls on these state stores. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (KAFKA-8065) Forwarding modified timestamps does not reset timestamp correctly
[ https://issues.apache.org/jira/browse/KAFKA-8065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-8065: -- Assignee: Matthias J. Sax > Forwarding modified timestamps does not reset timestamp correctly > - > > Key: KAFKA-8065 > URL: https://issues.apache.org/jira/browse/KAFKA-8065 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1, 2.2.0, 2.1.1 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Major > Fix For: 2.2.0, 2.0.2, 2.1.2 > > > Using Processor API, users can set a new output record timestamp via > `context.forward(..., To.all().withTimestamp(...))`. However, after the > forward()-call returns, the timestamp is not reset to the original input > record timestamp and thus a consecutive call to `context.forward(...)` > without `To` will use the newly set output record timestamp from before, too. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] guozhangwang commented on pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning
guozhangwang commented on pull request #11868: URL: https://github.com/apache/kafka/pull/11868#issuecomment-1063695247 > Anyways I'll push a fix for this, and then we should be good to go Thanks @wcarlson5 for the report! @ableegoldman please feel free to move on afterwards. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] peterwanner commented on pull request #11855: MINOR: Clarify producer batch.size in upgrade docs
peterwanner commented on pull request #11855: URL: https://github.com/apache/kafka/pull/11855#issuecomment-1063660862 > @peterwanner , since it's reaching the end of the release. Please let me know if you can't work on it before next weekend. Thank you. @showuon ok I will finish in the afternoon, please wait for a seconds. Thank you so much for your reminder -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning
ableegoldman commented on pull request #11868: URL: https://github.com/apache/kafka/pull/11868#issuecomment-1063651734 Ah good catch @wcarlson5 , I need to move the offset reset logic out of the actual `removeNamedTopology` call and make sure we don't block on the offsets being removed until the `get()` -- otherwise we get deadlock if we have two Streams clients and try to remove a named topology from both from a single thread. (To clarify for anyone else, this is not really a realistic/recommended usage pattern for real applications, but it helps keep the tests simple and makes for a more intuitive blocking behavior anyways) Anyways I'll push a fix for this, and then we should be good to go -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning
wcarlson5 commented on pull request #11868: URL: https://github.com/apache/kafka/pull/11868#issuecomment-1063618150 `shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets` is deadlocking. We need to be able to have its be async a little or else it can't make progress -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning
guozhangwang commented on a change in pull request #11868: URL: https://github.com/apache/kafka/pull/11868#discussion_r823308917 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ## @@ -266,46 +266,57 @@ private boolean maybeCompleteFutureIfStillInCREATED(final KafkaFutureImpl private RemoveNamedTopologyResult resetOffsets(final KafkaFutureImpl removeTopologyFuture, final Set partitionsToReset) { -if (!partitionsToReset.isEmpty()) { -removeTopologyFuture.whenComplete((v, throwable) -> { -if (throwable != null) { -removeTopologyFuture.completeExceptionally(throwable); -} -DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null; -while (deleteOffsetsResult == null) { -try { -deleteOffsetsResult = adminClient.deleteConsumerGroupOffsets( - applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), partitionsToReset); -deleteOffsetsResult.all().get(); -} catch (final InterruptedException ex) { -ex.printStackTrace(); +final KafkaFutureImpl resetOffsetsFuture = new KafkaFutureImpl<>(); +try { +removeTopologyFuture.get(); Review comment: Got it, that makes a lot of sense, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #11863: KAFKA-13689: Revert AbstractConfig code changes
showuon merged pull request #11863: URL: https://github.com/apache/kafka/pull/11863 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13717) KafkaConsumer.close throws authorization exception even when commit offsets is empty
[ https://issues.apache.org/jira/browse/KAFKA-13717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-13717. --- Fix Version/s: 3.2.0 Assignee: Vincent Jiang Resolution: Fixed > KafkaConsumer.close throws authorization exception even when commit offsets > is empty > > > Key: KAFKA-13717 > URL: https://issues.apache.org/jira/browse/KAFKA-13717 > Project: Kafka > Issue Type: Bug > Components: unit tests >Reporter: Vincent Jiang >Assignee: Vincent Jiang >Priority: Major > Fix For: 3.2.0 > > > When offsets is empty and coordinator is unknown, KafkaConsumer.close doesn't > throw exception before commit > [https://github.com/apache/kafka/commit/4b468a9d81f7380f7197a2a6b859c1b4dca84bd9|https://github.com/apache/kafka/commit/4b468a9d81f7380f7197a2a6b859c1b4dca84bd9,]. > After this commit, Kafka.close may throw authorization exception. > > Root cause is because in the commit, the logic is changed to call > lookupCoordinator even if offsets is empty. > > Even if a consumer doesn't have access to a group or a topic, it might be > better to not throw authorization exception in this case because close() call > doesn't touch actually access any resource. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] showuon commented on pull request #11855: MINOR: Clarify producer batch.size in upgrade docs
showuon commented on pull request #11855: URL: https://github.com/apache/kafka/pull/11855#issuecomment-1063601218 @peterwanner , since it's reaching the end of the release. Please let me know if you can't work on it before next weekend. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #11864: KAFKA-13717: skip coordinator lookup in commitOffsetsAsync if offsets is empty
showuon merged pull request #11864: URL: https://github.com/apache/kafka/pull/11864 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #11864: KAFKA-13717: skip coordinator lookup in commitOffsetsAsync if offsets is empty
showuon commented on pull request #11864: URL: https://github.com/apache/kafka/pull/11864#issuecomment-1063598542 Failed tests are unrelated. ``` Build / PowerPC / kafka.network.SocketServerTest.testIdleConnection() Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testOffsetSyncsTopicsOnTarget() Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplication() Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector Build / JDK 11 and Scala 2.13 / kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testPreferredReplicaElection, Security=PLAINTEXT Build / JDK 17 and Scala 2.13 / kafka.api.ConsumerBounceTest.testClose() Build / JDK 11 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize() Build / JDK 11 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize() Build / JDK 17 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize() Build / JDK 17 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10160) Kafka MM2 consumer configuration
[ https://issues.apache.org/jira/browse/KAFKA-10160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503939#comment-17503939 ] Harsha Madduri commented on KAFKA-10160: Is there a reason we are holding this from going into trunk? [~ryannedolan] , [~sbellapu] > Kafka MM2 consumer configuration > > > Key: KAFKA-10160 > URL: https://issues.apache.org/jira/browse/KAFKA-10160 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.5.0, 2.4.1 >Reporter: Pavol Ipoth >Assignee: sats >Priority: Major > Labels: configuration, kafka, mirror-maker > Fix For: 2.4.2, 2.8.0 > > > [https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java#L51,] > according this producer/consumer level properties should be configured as > e.g. somesource->sometarget.consumer.client.id, i try to set > somesource->sometarget.consumer.auto.offset.reset=latest, but without > success, consumer always tries to fetch earliest, not sure if bug or my > misconfiguration, but then at least some update to docu would be useful -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] showuon commented on a change in pull request #11864: KAFKA-13717: skip coordinator lookup in commitOffsetsAsync if offsets is empty
showuon commented on a change in pull request #11864: URL: https://github.com/apache/kafka/pull/11864#discussion_r823268006 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -1000,8 +1000,11 @@ void invokeCompletedOffsetCommitCallbacks() { public RequestFuture commitOffsetsAsync(final Map offsets, final OffsetCommitCallback callback) { invokeCompletedOffsetCommitCallbacks(); -RequestFuture future = null; -if (!coordinatorUnknown()) { +RequestFuture future = null; +if (offsets.isEmpty()) { +// No need to check coordinator if offsets is empty since commit of empty offsets is completed locally. +future = doCommitOffsetsAsync(offsets, callback); +} else if (!coordinatorUnknown()) { future = doCommitOffsetsAsync(offsets, callback); Review comment: I checked again, and I think it's OK. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11874: Fix typos in configuration docs
showuon commented on a change in pull request #11874: URL: https://github.com/apache/kafka/pull/11874#discussion_r823265635 ## File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ## @@ -404,8 +404,8 @@ "org.apache.kafka.common.serialization.Serde interface."; public static final String WINDOWED_INNER_CLASS_SERDE = "windowed.inner.class.serde"; -private static final String WINDOWED_INNER_CLASS_SERDE_DOC = " Default serializer / deserializer for the inner class of a windowed record. Must implement the \" +\n" + -"\"org.apache.kafka.common.serialization.Serde interface.. Note that setting this config in KafkaStreams application would result " + +private static final String WINDOWED_INNER_CLASS_SERDE_DOC = " Default serializer / deserializer for the inner class of a windowed record. Must implement the " + +"org.apache.kafka.common.serialization.Serde interface. Note that setting this config in KafkaStreams application would result " + Review comment: Nice catch! Note, current doc is like this: ![image](https://user-images.githubusercontent.com/43372967/157572498-c1200a0f-ac13-4f8d-b991-7d2f2e2e1d11.png) ## File path: clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java ## @@ -216,8 +216,10 @@ private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking." + " Note that if this config is set to be greater than 1 and enable.idempotence is set to false, there is a risk of" + " message re-ordering after a failed send due to retries (i.e., if retries are enabled)." -+ " Additionally, enabling idempotence requires this config value to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "." -+ " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled."; ++ " Additionally, enabling idempotence requires the value of this configuration to be less than or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "." ++ " If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. " ++ " Record ordering is preserved when enable.idempotence is set to true for idempotent " ++ " producer (or transactional producer), even when max in-flight requests are greater than 1 (supported up to 5)."; Review comment: Actually, I think the original version is better. Could you revert this change, and only fix the typo in stream config? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon edited a comment on pull request #10802: KAFKA-6718 / Update SubscriptionInfoData with clientTags
showuon edited a comment on pull request #10802: URL: https://github.com/apache/kafka/pull/10802#issuecomment-1063569496 I'll start to take a look this week. Sorry for late response. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10802: KAFKA-6718 / Update SubscriptionInfoData with clientTags
showuon commented on pull request #10802: URL: https://github.com/apache/kafka/pull/10802#issuecomment-1063569496 I'll start to take a look this week. Sorry for late. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13690) Flaky test EosIntegrationTest.shouldWriteLatestOffsetsToCheckpointOnShutdown[at_least_once]
[ https://issues.apache.org/jira/browse/KAFKA-13690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503932#comment-17503932 ] Guozhang Wang commented on KAFKA-13690: --- May cover `shouldCommitCorrectOffsetIfInputTopicIsTransactional`. > Flaky test > EosIntegrationTest.shouldWriteLatestOffsetsToCheckpointOnShutdown[at_least_once] > --- > > Key: KAFKA-13690 > URL: https://issues.apache.org/jira/browse/KAFKA-13690 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > > The _at_least_once_ version of the > "{*}EosIntegrationTest.shouldWriteLatestOffsetsToCheckpointOnShutdown"{*} > test is occasionally failing with > h3. Error Message > java.lang.AssertionError: The committed records do not match what expected > Expected: <[KeyValue(0, 0), KeyValue(0, 1), KeyValue(0, 3), KeyValue(0, 6), > KeyValue(0, 10), KeyValue(0, 15), KeyValue(0, 21), KeyValue(0, 28), > KeyValue(0, 36), KeyValue(0, 45)]> but: was <[KeyValue(0, 0), KeyValue(0, 1), > KeyValue(0, 3), KeyValue(0, 6), KeyValue(0, 10), KeyValue(0, 10), KeyValue(0, > 11), KeyValue(0, 13), KeyValue(0, 16), KeyValue(0, 20), KeyValue(0, 25), > KeyValue(0, 31), KeyValue(0, 38)]> > > Seems we are receiving more than the expected records. > ...of course, this is an ALOS flavor of the {*}EOS{*}IntegrationTest, so > perhaps we shouldn't be running this variant at all? Not sure if this > explains the exact output we receive but it certainly seems suspicious > > Added at_least_once in [https://github.com/apache/kafka/pull/11283] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] ableegoldman commented on a change in pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning
ableegoldman commented on a change in pull request #11868: URL: https://github.com/apache/kafka/pull/11868#discussion_r823239035 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ## @@ -266,46 +266,57 @@ private boolean maybeCompleteFutureIfStillInCREATED(final KafkaFutureImpl private RemoveNamedTopologyResult resetOffsets(final KafkaFutureImpl removeTopologyFuture, final Set partitionsToReset) { -if (!partitionsToReset.isEmpty()) { -removeTopologyFuture.whenComplete((v, throwable) -> { -if (throwable != null) { -removeTopologyFuture.completeExceptionally(throwable); -} -DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null; -while (deleteOffsetsResult == null) { -try { -deleteOffsetsResult = adminClient.deleteConsumerGroupOffsets( - applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), partitionsToReset); -deleteOffsetsResult.all().get(); -} catch (final InterruptedException ex) { -ex.printStackTrace(); +final KafkaFutureImpl resetOffsetsFuture = new KafkaFutureImpl<>(); +try { +removeTopologyFuture.get(); Review comment: Yeah that is the main fix, however I realized that we are currently in this awkward state of psueod-async-ness and I think we might ultimately want to scratch this whole `RemoveNamedTopologyResult` and just make it fully blocking. Though I didn't want to go ahead and change the method signatures just yet, so I just have it block on the named topology future and then perform the offset reset. The actual advantage here is that before this, we were actually making the StreamThread who completed the future perform the offset reset, which of course means it gets stuck for a bit and can't continue processing until basically the whole group has dropped this named topology. Better to have the caller thread do the offset reset to let the StreamThreads keep processing the other topologies. (When we get to finally doing a KIP maybe we can discuss having a blocking and non-blocking option for these, but my feeling is let's not complicate things unnecessarily and it may be that we only really need a blocking version) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11867: KAFKA-12648: fix #getMinThreadVersion and include IOException + topologyName in StreamsException when topology dir cleanup fails
ableegoldman commented on pull request #11867: URL: https://github.com/apache/kafka/pull/11867#issuecomment-1063520672 Merged to trunk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #11867: KAFKA-12648: fix #getMinThreadVersion and include IOException + topologyName in StreamsException when topology dir cleanup fails
ableegoldman merged pull request #11867: URL: https://github.com/apache/kafka/pull/11867 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #11867: KAFKA-12648: fix #getMinThreadVersion and include IOException + topologyName in StreamsException when topology dir cleanup fails
ableegoldman commented on pull request #11867: URL: https://github.com/apache/kafka/pull/11867#issuecomment-1063520314 Found a somewhat more involved bug that was responsible for this test failing, so I opened a second bugfix PR you can find here Without this followup PR, the test may continue to see failures (as we see in the build results here). But otherwise test failures are unrelated, going to merge this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lihaosky commented on a change in pull request #11802: [RFC][1/N]add new RocksDBTimeOrderedWindowStore
lihaosky commented on a change in pull request #11802: URL: https://github.com/apache/kafka/pull/11802#discussion_r823220988 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java ## @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.Arrays; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; + +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.kafka.streams.state.StateSerdes; + +import static org.apache.kafka.streams.state.StateSerdes.TIMESTAMP_SIZE; +import static org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize; + +public class PrefixedWindowKeySchemas { + +private static final int PREFIX_SIZE = 1; +private static final byte TIME_FIRST_PREFIX = 0; +private static final byte KEY_FIRST_PREFIX = 1; +private static final int SEQNUM_SIZE = 4; +private static final int SUFFIX_SIZE = TIMESTAMP_SIZE + SEQNUM_SIZE; + +private static byte extractPrefix(final byte[] binaryBytes) { +return binaryBytes[0]; +} + +public static class TimeFirstWindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema { + +@Override +public Bytes upperRange(final Bytes key, final long to) { +if (key == null) { +// Put next prefix instead of null so that we can start from right prefix +// when scanning backwards +final byte nextPrefix = TIME_FIRST_PREFIX + 1; +return Bytes.wrap(ByteBuffer.allocate(PREFIX_SIZE).put(nextPrefix).array()); +} +byte[] maxKey = new byte[key.get().length]; +Arrays.fill(maxKey, (byte) 0xFF); +return Bytes.wrap(ByteBuffer.allocate(PREFIX_SIZE + TIMESTAMP_SIZE + maxKey.length + SEQNUM_SIZE) +.put(TIME_FIRST_PREFIX) +.putLong(to) +.put(maxKey).putInt(Integer.MAX_VALUE) +.array()); +} + +@Override +public Bytes lowerRange(final Bytes key, final long from) { +if (key == null) { +return Bytes.wrap(ByteBuffer.allocate(PREFIX_SIZE + TIMESTAMP_SIZE) +.put(TIME_FIRST_PREFIX) +.putLong(from) +.array()); +} + +/* + * Larger timestamp or key's byte order can't be smaller than this lower range. Reason: + * 1. Timestamp is fixed length (with big endian byte order). Since we put timestamp + *first, larger timestamp will have larger byte order. + * 2. If timestamp is the same but key (k1) is larger than this lower range key (k2): + * a. If k2 is not a prefix of k1, then k1 will always have larger byte order no + *matter what seqnum k2 has + * b. If k2 is a prefix of k1, since k2's seqnum is 0, after k1 appends seqnum, + *it will always be larger than (k1 + seqnum). + */ +return Bytes.wrap(ByteBuffer.allocate(PREFIX_SIZE + TIMESTAMP_SIZE + key.get().length) +.put(TIME_FIRST_PREFIX) +.putLong(from) +.put(key.get()) +.array()); +} + +@Override +public Bytes lowerRangeFixedSize(final Bytes key, final long from) { +return TimeFirstWindowKeySchema.toStoreKeyBinary(key, Math.max(0, from), +0); +} + +@Override +public Bytes upperRangeFixedSize(final Bytes key, final long to) { +return TimeFirstWindowKeySchema.toStoreKeyBinary(key, to, Integer.MAX_VALUE); +} + +@Override +public long segmentTimestamp(final Bytes key) { +return TimeFirstWindowKeySchema.extractStoreTimestamp(key.get()); +} + +@Override +public HasNextCondition hasNextCondition(final Bytes
[jira] [Comment Edited] (KAFKA-10690) Produce-response delay caused by lagging replica fetch which affects in-sync one
[ https://issues.apache.org/jira/browse/KAFKA-10690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503911#comment-17503911 ] Haruki Okada edited comment on KAFKA-10690 at 3/10/22, 12:14 AM: - Thanks for the comment. [~showuon] > Are you sure this issue is due to the `in-sync` replica fetch? Yeah, as long as replica fetch is `out-of-sync`, it doesn't block produce-request so the issue happens only on `in-sync` replica when `in-sync` replica fetching and `out-of-sync` replica fetching are done in same replica fetcher thread on follower side. > Could you have a PoC to add an additional thread pool for lagging replica to > confirm this solution? Haven't tried, as we wanted to confirm if anyone encounter similar issue or not (and if anyone addressed it in some way) first. But let us consider! [~junrao] > Have you tried enabling replication throttling? Yeah, we use replication throttling, and we suppose disk's performance itself is stable even on lagging-replica fetch. We use HDD, so reading the data takes few~tens of milliseconds per IO even it's stable. So if lagging replica fetch (likely not in page cache so causes disk reads) and in-sync replica fetch are done in same replica fetcher thread, in-sync one greatly affected by due to lagging one. was (Author: ocadaruma): Thanks for the comment. [~showuon] > Are you sure this issue is due to the `in-sync` replica fetch? Yeah, as long as replica fetch is `out-of-sync`, it doesn't block produce-request so the issue happens only on `in-sync` replica when `in-sync` replica fetching and `out-of-sync` replica fetching are done in same replica fetcher thread on follower side. > Could you have a PoC to add an additional thread pool for lagging replica to > confirm this solution? Haven't tried, as we wanted to confirm if anyone encounter similar issue or not (and if anyone addressed it in some way) first. But let us consider! [~junrao] > Have you tried enabling replication throttling? Yeah, we use replication throttling, and we suppose disk's performance itself is stable even on lagging-replica fetch. We use HDD, so reading the data takes few~tens of milliseconds per IO even it's stable. So if lagging replica fetch (likely not in page cache so causes disk reads) and in-sync replica fetch are done in same replica fetcher thread (i.e. in same Fetch request), in-sync one greatly affected by due to lagging one. > Produce-response delay caused by lagging replica fetch which affects in-sync > one > > > Key: KAFKA-10690 > URL: https://issues.apache.org/jira/browse/KAFKA-10690 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.4.1 >Reporter: Haruki Okada >Priority: Major > Attachments: image-2020-11-06-11-15-21-781.png, > image-2020-11-06-11-15-38-390.png, image-2020-11-06-11-17-09-910.png > > > h2. Our environment > * Kafka version: 2.4.1 > h2. Phenomenon > * Produce response time 99th (remote scope) degrades to 500ms, which is 20 > times worse than usual > ** Meanwhile, the cluster was running replica reassignment to service-in new > machine to recover replicas which held by failed (Hardware issue) broker > machine > !image-2020-11-06-11-15-21-781.png|width=292,height=166! > h2. Analysis > Let's say > * broker-X: The broker we observed produce latency degradation > * broker-Y: The broker under servicing-in > broker-Y was catching up replicas of partitions: > * partition-A: has relatively small log size > * partition-B: has large log size > (actually, broker-Y was catching-up many other partitions. I noted only two > partitions here to make explanation simple) > broker-X was the leader for both partition-A and partition-B. > We found that both partition-A and partition-B are assigned to same > ReplicaFetcherThread of broker-Y, and produce latency started to degrade > right after broker-Y finished catching up partition-A. > !image-2020-11-06-11-17-09-910.png|width=476,height=174! > Besides, we observed disk reads on broker-X during service-in. (This is > natural since old segments are likely not in page cache) > !image-2020-11-06-11-15-38-390.png|width=292,height=193! > So we suspected that: > * In-sync replica fetch (partition-A) was involved by lagging replica fetch > (partition-B), which should be slow because it causes actual disk reads > ** Since ReplicaFetcherThread sends fetch requests in blocking manner, next > fetch request can't be sent until one fetch request completes > ** => Causes in-sync replica fetch for partitions assigned to same replica > fetcher thread to delay > ** => Causes remote scope produce latency degradation > h2. Possible fix > We
[GitHub] [kafka] guozhangwang commented on a change in pull request #11802: [RFC][1/N]add new RocksDBTimeOrderedWindowStore
guozhangwang commented on a change in pull request #11802: URL: https://github.com/apache/kafka/pull/11802#discussion_r823215870 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java ## @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.Arrays; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; + +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.kafka.streams.state.StateSerdes; + +import static org.apache.kafka.streams.state.StateSerdes.TIMESTAMP_SIZE; +import static org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize; + +public class PrefixedWindowKeySchemas { + +private static final int PREFIX_SIZE = 1; +private static final byte TIME_FIRST_PREFIX = 0; +private static final byte KEY_FIRST_PREFIX = 1; +private static final int SEQNUM_SIZE = 4; +private static final int SUFFIX_SIZE = TIMESTAMP_SIZE + SEQNUM_SIZE; + +private static byte extractPrefix(final byte[] binaryBytes) { +return binaryBytes[0]; +} + +public static class TimeFirstWindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema { + +@Override +public Bytes upperRange(final Bytes key, final long to) { +if (key == null) { +// Put next prefix instead of null so that we can start from right prefix +// when scanning backwards +final byte nextPrefix = TIME_FIRST_PREFIX + 1; +return Bytes.wrap(ByteBuffer.allocate(PREFIX_SIZE).put(nextPrefix).array()); +} +byte[] maxKey = new byte[key.get().length]; +Arrays.fill(maxKey, (byte) 0xFF); +return Bytes.wrap(ByteBuffer.allocate(PREFIX_SIZE + TIMESTAMP_SIZE + maxKey.length + SEQNUM_SIZE) +.put(TIME_FIRST_PREFIX) +.putLong(to) +.put(maxKey).putInt(Integer.MAX_VALUE) +.array()); +} + +@Override +public Bytes lowerRange(final Bytes key, final long from) { +if (key == null) { +return Bytes.wrap(ByteBuffer.allocate(PREFIX_SIZE + TIMESTAMP_SIZE) +.put(TIME_FIRST_PREFIX) +.putLong(from) +.array()); +} + +/* + * Larger timestamp or key's byte order can't be smaller than this lower range. Reason: + * 1. Timestamp is fixed length (with big endian byte order). Since we put timestamp + *first, larger timestamp will have larger byte order. + * 2. If timestamp is the same but key (k1) is larger than this lower range key (k2): + * a. If k2 is not a prefix of k1, then k1 will always have larger byte order no + *matter what seqnum k2 has + * b. If k2 is a prefix of k1, since k2's seqnum is 0, after k1 appends seqnum, + *it will always be larger than (k1 + seqnum). + */ +return Bytes.wrap(ByteBuffer.allocate(PREFIX_SIZE + TIMESTAMP_SIZE + key.get().length) +.put(TIME_FIRST_PREFIX) +.putLong(from) +.put(key.get()) +.array()); +} + +@Override +public Bytes lowerRangeFixedSize(final Bytes key, final long from) { +return TimeFirstWindowKeySchema.toStoreKeyBinary(key, Math.max(0, from), +0); +} + +@Override +public Bytes upperRangeFixedSize(final Bytes key, final long to) { +return TimeFirstWindowKeySchema.toStoreKeyBinary(key, to, Integer.MAX_VALUE); +} + +@Override +public long segmentTimestamp(final Bytes key) { +return TimeFirstWindowKeySchema.extractStoreTimestamp(key.get()); +} + +@Override +public HasNextCondition hasNextCondition(final Bytes
[GitHub] [kafka] vincent81jiang commented on a change in pull request #11864: KAFKA-13717: skip coordinator lookup in commitOffsetsAsync if offsets is empty
vincent81jiang commented on a change in pull request #11864: URL: https://github.com/apache/kafka/pull/11864#discussion_r823215546 ## File path: core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ## @@ -2483,6 +2483,20 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } } + + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testCreateAndCloseConsumerWithNoAccess(quorum: String): Unit = { +val consumer = createConsumer() +try { + // Close consumer without consuming anything. close() call should pass successfully and throw no exception. + consumer.close() +} catch { + case e: Throwable => +fail(s"Exception not expected on closing consumer: $e") +} Review comment: @showuon, sounds good. Updated to use assertDoesNotThrow. Note: I cannot pass "() => consumer.close()" directly to assertDoesNotThrow because it causes compile error: ``` ambiguous reference to overloaded definition, both method assertDoesNotThrow in class Assertions of type [T](x$1: org.junit.jupiter.api.function.ThrowingSupplier[T], x$2: String): T and method assertDoesNotThrow in class Assertions of type (x$1: org.junit.jupiter.api.function.Executable, x$2: String): Unit match argument types (() => Unit,String) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10690) Produce-response delay caused by lagging replica fetch which affects in-sync one
[ https://issues.apache.org/jira/browse/KAFKA-10690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503911#comment-17503911 ] Haruki Okada commented on KAFKA-10690: -- Thanks for the comment. [~showuon] > Are you sure this issue is due to the `in-sync` replica fetch? Yeah, as long as replica fetch is `out-of-sync`, it doesn't block produce-request so the issue happens only on `in-sync` replica when `in-sync` replica fetching and `out-of-sync` replica fetching are done in same replica fetcher thread on follower side. > Could you have a PoC to add an additional thread pool for lagging replica to > confirm this solution? Haven't tried, as we wanted to confirm if anyone encounter similar issue or not (and if anyone addressed it in some way) first. But let us consider! [~junrao] > Have you tried enabling replication throttling? Yeah, we use replication throttling, and we suppose disk's performance itself is stable even on lagging-replica fetch. We use HDD, so reading the data takes few~tens of milliseconds per IO even it's stable. So if lagging replica fetch (likely not in page cache so causes disk reads) and in-sync replica fetch are done in same replica fetcher thread (i.e. in same Fetch request), in-sync one greatly affected by due to lagging one. > Produce-response delay caused by lagging replica fetch which affects in-sync > one > > > Key: KAFKA-10690 > URL: https://issues.apache.org/jira/browse/KAFKA-10690 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.4.1 >Reporter: Haruki Okada >Priority: Major > Attachments: image-2020-11-06-11-15-21-781.png, > image-2020-11-06-11-15-38-390.png, image-2020-11-06-11-17-09-910.png > > > h2. Our environment > * Kafka version: 2.4.1 > h2. Phenomenon > * Produce response time 99th (remote scope) degrades to 500ms, which is 20 > times worse than usual > ** Meanwhile, the cluster was running replica reassignment to service-in new > machine to recover replicas which held by failed (Hardware issue) broker > machine > !image-2020-11-06-11-15-21-781.png|width=292,height=166! > h2. Analysis > Let's say > * broker-X: The broker we observed produce latency degradation > * broker-Y: The broker under servicing-in > broker-Y was catching up replicas of partitions: > * partition-A: has relatively small log size > * partition-B: has large log size > (actually, broker-Y was catching-up many other partitions. I noted only two > partitions here to make explanation simple) > broker-X was the leader for both partition-A and partition-B. > We found that both partition-A and partition-B are assigned to same > ReplicaFetcherThread of broker-Y, and produce latency started to degrade > right after broker-Y finished catching up partition-A. > !image-2020-11-06-11-17-09-910.png|width=476,height=174! > Besides, we observed disk reads on broker-X during service-in. (This is > natural since old segments are likely not in page cache) > !image-2020-11-06-11-15-38-390.png|width=292,height=193! > So we suspected that: > * In-sync replica fetch (partition-A) was involved by lagging replica fetch > (partition-B), which should be slow because it causes actual disk reads > ** Since ReplicaFetcherThread sends fetch requests in blocking manner, next > fetch request can't be sent until one fetch request completes > ** => Causes in-sync replica fetch for partitions assigned to same replica > fetcher thread to delay > ** => Causes remote scope produce latency degradation > h2. Possible fix > We think this issue can be addressed by designating part of > ReplicaFetcherThread (or creating another thread pool) for lagging replica > catching-up, but not so sure this is the appropriate way. > Please give your opinions about this issue. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] guozhangwang commented on a change in pull request #11802: [RFC][1/N]add new RocksDBTimeOrderedWindowStore
guozhangwang commented on a change in pull request #11802: URL: https://github.com/apache/kafka/pull/11802#discussion_r823200925 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java ## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; +import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; +import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED; + +public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore implements SegmentedBytesStore { +private static final Logger LOG = LoggerFactory.getLogger(AbstractDualSchemaRocksDBSegmentedBytesStore.class); + +private final String name; +protected final AbstractSegments segments; +private final String metricScope; +protected final KeySchema baseKeySchema; +protected final Optional indexKeySchema; + + +private ProcessorContext context; +private StateStoreContext stateStoreContext; Review comment: Filed https://issues.apache.org/jira/browse/KAFKA-13722 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11796: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
guozhangwang commented on pull request #11796: URL: https://github.com/apache/kafka/pull/11796#issuecomment-1063494691 cc @ableegoldman -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11796: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
guozhangwang commented on pull request #11796: URL: https://github.com/apache/kafka/pull/11796#issuecomment-1063494547 Re-triggered jenkins. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 edited a comment on pull request #11873: MINOR: Revert "KAFKA-13542: add rebalance reason in Kafka Streams (#11804)"
wcarlson5 edited a comment on pull request #11873: URL: https://github.com/apache/kafka/pull/11873#issuecomment-1063493748 @vvcephei @cadonna -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on pull request #11873: MINOR: Revert "KAFKA-13542: add rebalance reason in Kafka Streams (#11804)"
wcarlson5 commented on pull request #11873: URL: https://github.com/apache/kafka/pull/11873#issuecomment-1063493748 @vvcephei -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] norwood closed pull request #6508: MINOR: replication factor is short
norwood closed pull request #6508: URL: https://github.com/apache/kafka/pull/6508 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #11858: [Emit final][3/N] add emit final param to relevant windows
guozhangwang commented on pull request #11858: URL: https://github.com/apache/kafka/pull/11858#issuecomment-1063480596 BTW since this is an API change, I think the above should happen on the email thread than on just this PR as well. So the sooner we put up a KIP for public discussion the better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] joel-hamill opened a new pull request #11874: Fix typos in configuration docs
joel-hamill opened a new pull request #11874: URL: https://github.com/apache/kafka/pull/11874 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12879) Compatibility break in Admin.listOffsets()
[ https://issues.apache.org/jira/browse/KAFKA-12879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503891#comment-17503891 ] Randall Hauch commented on KAFKA-12879: --- The approach we decided to take was to revert the previous admin client changes from KAFKA-12339 to bring the admin client behavior back to previous expectations, and to implement retries within the KafkaBasedLog to handle cases like those identified in that issue. For example, a likely root cause of KAFKA-12339 was a Connect worker instantiates its KafkaConfigBackingStore (and other internal topic stores), which creates a KafkaBasedLog that as part of start() creates the topic if it doesn't exist and then immediately tries to read the offsets. That reading of offsets can fail if the metadata for the newly created topic hasn't been propagated to all of the brokers. We can solve this particular root cause easily by retrying the reading of offsets within the KafkaBasedLog's start() method, and since topic metadata should be propagated relatively quickly, we don't need to retry for that long – and most of the time we'd probably successfully retry within a few retries. I've just merged to trunk a PR that does this. When trying to backport this, some of the newer tests were flaky, so [~pnee] created another PR (plus another) to hopefully eliminate that flakiness, and it seemed to work. I'm in the process of backporting this all the way back to 2.5 branch, since that's how far back the regression from KAFKA-12339 was backported. > Compatibility break in Admin.listOffsets() > -- > > Key: KAFKA-12879 > URL: https://issues.apache.org/jira/browse/KAFKA-12879 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 2.8.0, 2.7.1, 2.6.2 >Reporter: Tom Bentley >Assignee: Philip Nee >Priority: Major > > KAFKA-12339 incompatibly changed the semantics of Admin.listOffsets(). > Previously it would fail with {{UnknownTopicOrPartitionException}} when a > topic didn't exist. Now it will (eventually) fail with {{TimeoutException}}. > It seems this was more or less intentional, even though it would break code > which was expecting and handling the {{UnknownTopicOrPartitionException}}. A > workaround is to use {{retries=1}} and inspect the cause of the > {{TimeoutException}}, but this isn't really suitable for cases where the same > Admin client instance is being used for other calls where retries is > desirable. > Furthermore as well as the intended effect on {{listOffsets()}} it seems that > the change could actually affect other methods of Admin. > More generally, the Admin client API is vague about which exceptions can > propagate from which methods. This means that it's not possible to say, in > cases like this, whether the calling code _should_ have been relying on the > {{UnknownTopicOrPartitionException}} or not. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13725) KIP-768 OAuth code mixes public and internal classes in same package
[ https://issues.apache.org/jira/browse/KAFKA-13725?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-13725: -- Description: The {{org.apache.kafka.common.security.oauthbearer.secured}} package from KIP-768 incorrectly mixed all of the classes (public and internal) in the package together. This bug is to remove all but the public classes from that package and move the rest to a new {{org.apache.kafka.common.security.oauthbearer.secured.internal}} package. This should be back-ported to all versions in which the KIP-768 OAuth work occurs. was: The {{org.apache.kafka.common.security.oauthbearer.secured}} package from KIP-768 incorrectly mixed all of the classes (public and internal) in the package together. This bug is to remove all but the public classes from that package and move the rest to a new {{org.apache.kafka.common.security.oauthbearer.secured.internal}} package. This should be packported to all versions in which the KIP-768 OAuth work occurs. > KIP-768 OAuth code mixes public and internal classes in same package > > > Key: KAFKA-13725 > URL: https://issues.apache.org/jira/browse/KAFKA-13725 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.1.0, 3.2.0, 3.1.1 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > > The {{org.apache.kafka.common.security.oauthbearer.secured}} package from > KIP-768 incorrectly mixed all of the classes (public and internal) in the > package together. > This bug is to remove all but the public classes from that package and move > the rest to a new > {{org.apache.kafka.common.security.oauthbearer.secured.internal}} package. > This should be back-ported to all versions in which the KIP-768 OAuth work > occurs. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] kirktrue commented on pull request #11811: (docs) Add JavaDocs for org.apache.kafka.common.security.oauthbearer.secured
kirktrue commented on pull request #11811: URL: https://github.com/apache/kafka/pull/11811#issuecomment-1063442392 I have filed KAFKA-13725 to address the package layout bug. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13725) KIP-768 OAuth code mixes public and internal classes in same package
[ https://issues.apache.org/jira/browse/KAFKA-13725?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kirk True updated KAFKA-13725: -- Description: The {{org.apache.kafka.common.security.oauthbearer.secured}} package from KIP-768 incorrectly mixed all of the classes (public and internal) in the package together. This bug is to remove all but the public classes from that package and move the rest to a new {{org.apache.kafka.common.security.oauthbearer.secured.internal}} package. This should be packported to all versions in which the KIP-768 OAuth work occurs. > KIP-768 OAuth code mixes public and internal classes in same package > > > Key: KAFKA-13725 > URL: https://issues.apache.org/jira/browse/KAFKA-13725 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.1.0, 3.2.0, 3.1.1 >Reporter: Kirk True >Assignee: Kirk True >Priority: Major > > The {{org.apache.kafka.common.security.oauthbearer.secured}} package from > KIP-768 incorrectly mixed all of the classes (public and internal) in the > package together. > This bug is to remove all but the public classes from that package and move > the rest to a new > {{org.apache.kafka.common.security.oauthbearer.secured.internal}} package. > This should be packported to all versions in which the KIP-768 OAuth work > occurs. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] wcarlson5 opened a new pull request #11873: MINOR: Revert "KAFKA-13542: add rebalance reason in Kafka Streams (#11804)"
wcarlson5 opened a new pull request #11873: URL: https://github.com/apache/kafka/pull/11873 This reverts commit 2ccc834faa3fffcd5d15d2463aeef3ee6f5cea13. We were seeing serious regressions in our state heavy benchmarks. Not yet sure how this was causing it but reverting it fixed the issues that started with this commit. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13726) Fix Vulnerability CVE-2022-23181 -Upgrade org.apache.tomcat.embed_tomcat-embed-core
Chris Sabelstrom created KAFKA-13726: Summary: Fix Vulnerability CVE-2022-23181 -Upgrade org.apache.tomcat.embed_tomcat-embed-core Key: KAFKA-13726 URL: https://issues.apache.org/jira/browse/KAFKA-13726 Project: Kafka Issue Type: Bug Affects Versions: 2.8.1 Reporter: Chris Sabelstrom Our security scanner detected the following vulnerablity. Please upgrade to version noted in Fix Status column. |CVE ID|Severity|Packages|Package Version|CVSS|Fix Status| |CVE-2022-23181|high|org.apache.tomcat.embed_tomcat-embed-core|9.0.54|7|fixed in 10.0.0, 9.0.1| -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13725) KIP-768 OAuth code mixes public and internal classes in same package
Kirk True created KAFKA-13725: - Summary: KIP-768 OAuth code mixes public and internal classes in same package Key: KAFKA-13725 URL: https://issues.apache.org/jira/browse/KAFKA-13725 Project: Kafka Issue Type: Bug Affects Versions: 3.1.0, 3.2.0, 3.1.1 Reporter: Kirk True Assignee: Kirk True -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13724) Fix Vulnerability CVE-2021-43859 - Upgrade com.thoughtworks.xstream_xstream
Chris Sabelstrom created KAFKA-13724: Summary: Fix Vulnerability CVE-2021-43859 - Upgrade com.thoughtworks.xstream_xstream Key: KAFKA-13724 URL: https://issues.apache.org/jira/browse/KAFKA-13724 Project: Kafka Issue Type: Bug Affects Versions: 2.8.1 Reporter: Chris Sabelstrom Our security scanner detected the following vulnerablity. Please upgrade to version noted in Fix Status column. |CVE ID|Severity|Packages|Package Version|CVSS|Fix Status| |CVE-2021-43859|high|com.thoughtworks.xstream_xstream|1.4.18|7.5|fixed in 1.4.19| -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] guozhangwang commented on a change in pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning
guozhangwang commented on a change in pull request #11868: URL: https://github.com/apache/kafka/pull/11868#discussion_r823130503 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ## @@ -266,46 +266,57 @@ private boolean maybeCompleteFutureIfStillInCREATED(final KafkaFutureImpl private RemoveNamedTopologyResult resetOffsets(final KafkaFutureImpl removeTopologyFuture, final Set partitionsToReset) { -if (!partitionsToReset.isEmpty()) { -removeTopologyFuture.whenComplete((v, throwable) -> { -if (throwable != null) { -removeTopologyFuture.completeExceptionally(throwable); -} -DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null; -while (deleteOffsetsResult == null) { -try { -deleteOffsetsResult = adminClient.deleteConsumerGroupOffsets( - applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), partitionsToReset); -deleteOffsetsResult.all().get(); -} catch (final InterruptedException ex) { -ex.printStackTrace(); +final KafkaFutureImpl resetOffsetsFuture = new KafkaFutureImpl<>(); +try { +removeTopologyFuture.get(); Review comment: Why we have to wait on the first future before moving forward to construct the second future now? I thought the main fix is only in https://github.com/apache/kafka/pull/11868/files#diff-8baa5d7209fc00074bf3fe24d709c2dcf2a44c1623d7ced8c0e29c1d832a3bcbR1132 above, and with that we do not need to change behavior to wait for the removal of topology completes still? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-12879) Compatibility break in Admin.listOffsets()
[ https://issues.apache.org/jira/browse/KAFKA-12879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503865#comment-17503865 ] Chris Egerton edited comment on KAFKA-12879 at 3/9/22, 9:51 PM: Are there any practical use cases for listing the offsets of a just-created topic? Are any of these use cases more likely than ones that would involve describing a just-created topic? It seems a little heavy-handed to suggest to users that they invoke {{Admin::describeTopics}} before {{Admin::listOffsets}} in order to handle non-existing topics, at least if this pattern hasn't already been documented as a best practice for people using the Java admin client. Preserving existing behavior (which IMO is valid for the reasons Colin has laid out) seems like the correct move here. was (Author: chrisegerton): Are there any practical use cases for listing the offsets of a just-created topic? Are any of these use cases more likely than ones that would involve describing a just-created topic? It seems a little heavy-handed to suggest to users that they invoke {{Admin::describeTopics}} before {{Admin::listOffsets}} in order to handle non-existing topics, at least if this pattern hasn't already been documented as a best practice for people using the Java admin client. > Compatibility break in Admin.listOffsets() > -- > > Key: KAFKA-12879 > URL: https://issues.apache.org/jira/browse/KAFKA-12879 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 2.8.0, 2.7.1, 2.6.2 >Reporter: Tom Bentley >Assignee: Philip Nee >Priority: Major > > KAFKA-12339 incompatibly changed the semantics of Admin.listOffsets(). > Previously it would fail with {{UnknownTopicOrPartitionException}} when a > topic didn't exist. Now it will (eventually) fail with {{TimeoutException}}. > It seems this was more or less intentional, even though it would break code > which was expecting and handling the {{UnknownTopicOrPartitionException}}. A > workaround is to use {{retries=1}} and inspect the cause of the > {{TimeoutException}}, but this isn't really suitable for cases where the same > Admin client instance is being used for other calls where retries is > desirable. > Furthermore as well as the intended effect on {{listOffsets()}} it seems that > the change could actually affect other methods of Admin. > More generally, the Admin client API is vague about which exceptions can > propagate from which methods. This means that it's not possible to say, in > cases like this, whether the calling code _should_ have been relying on the > {{UnknownTopicOrPartitionException}} or not. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (KAFKA-12879) Compatibility break in Admin.listOffsets()
[ https://issues.apache.org/jira/browse/KAFKA-12879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503865#comment-17503865 ] Chris Egerton edited comment on KAFKA-12879 at 3/9/22, 9:50 PM: Are there any practical use cases for listing the offsets of a just-created topic? Are any of these use cases more likely than ones that would involve describing a just-created topic? It seems a little heavy-handed to suggest to users that they invoke {{Admin::describeTopics}} before {{Admin::listOffsets}} in order to handle non-existing topics, at least if this pattern hasn't already been documented as a best practice for people using the Java admin client. was (Author: chrisegerton): Are there any practical use cases for listing the offsets of a just-created topic? Are any of these use cases more likely than ones that would involve describing a just-created topic? > Compatibility break in Admin.listOffsets() > -- > > Key: KAFKA-12879 > URL: https://issues.apache.org/jira/browse/KAFKA-12879 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 2.8.0, 2.7.1, 2.6.2 >Reporter: Tom Bentley >Assignee: Philip Nee >Priority: Major > > KAFKA-12339 incompatibly changed the semantics of Admin.listOffsets(). > Previously it would fail with {{UnknownTopicOrPartitionException}} when a > topic didn't exist. Now it will (eventually) fail with {{TimeoutException}}. > It seems this was more or less intentional, even though it would break code > which was expecting and handling the {{UnknownTopicOrPartitionException}}. A > workaround is to use {{retries=1}} and inspect the cause of the > {{TimeoutException}}, but this isn't really suitable for cases where the same > Admin client instance is being used for other calls where retries is > desirable. > Furthermore as well as the intended effect on {{listOffsets()}} it seems that > the change could actually affect other methods of Admin. > More generally, the Admin client API is vague about which exceptions can > propagate from which methods. This means that it's not possible to say, in > cases like this, whether the calling code _should_ have been relying on the > {{UnknownTopicOrPartitionException}} or not. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-12879) Compatibility break in Admin.listOffsets()
[ https://issues.apache.org/jira/browse/KAFKA-12879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503865#comment-17503865 ] Chris Egerton commented on KAFKA-12879: --- Are there any practical use cases for listing the offsets of a just-created topic? Are any of these use cases more likely than ones that would involve describing a just-created topic? > Compatibility break in Admin.listOffsets() > -- > > Key: KAFKA-12879 > URL: https://issues.apache.org/jira/browse/KAFKA-12879 > Project: Kafka > Issue Type: Bug > Components: admin >Affects Versions: 2.8.0, 2.7.1, 2.6.2 >Reporter: Tom Bentley >Assignee: Philip Nee >Priority: Major > > KAFKA-12339 incompatibly changed the semantics of Admin.listOffsets(). > Previously it would fail with {{UnknownTopicOrPartitionException}} when a > topic didn't exist. Now it will (eventually) fail with {{TimeoutException}}. > It seems this was more or less intentional, even though it would break code > which was expecting and handling the {{UnknownTopicOrPartitionException}}. A > workaround is to use {{retries=1}} and inspect the cause of the > {{TimeoutException}}, but this isn't really suitable for cases where the same > Admin client instance is being used for other calls where retries is > desirable. > Furthermore as well as the intended effect on {{listOffsets()}} it seems that > the change could actually affect other methods of Admin. > More generally, the Admin client API is vague about which exceptions can > propagate from which methods. This means that it's not possible to say, in > cases like this, whether the calling code _should_ have been relying on the > {{UnknownTopicOrPartitionException}} or not. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13723) max.compaction.lag.ms implemented incorrectly
[ https://issues.apache.org/jira/browse/KAFKA-13723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503854#comment-17503854 ] Jun Rao commented on KAFKA-13723: - [~xiongqiwu] : Thanks for the explanation. Make sense. So, this is not an issue. > max.compaction.lag.ms implemented incorrectly > - > > Key: KAFKA-13723 > URL: https://issues.apache.org/jira/browse/KAFKA-13723 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.3.0 >Reporter: Jun Rao >Priority: Major > > In https://issues.apache.org/jira/browse/KAFKA-7321, we introduced > max.compaction.lag.ms to guarantee that a record be cleaned before a certain > time. > > The implementation in LogCleanerManager has the following code. The path for > earliestDirtySegmentTimestamp < cleanUntilTime seems incorrect. In that case, > it seems that we should set the delay to 0 so that we could trigger cleaning > immediately since the segment has been dirty for longer than > max.compaction.lag.ms. > > > {code:java} > def maxCompactionDelay(log: UnifiedLog, firstDirtyOffset: Long, now: Long) : > Long = { > ... > val maxCompactionLagMs = math.max(log.config.maxCompactionLagMs, 0L) > val cleanUntilTime = now - maxCompactionLagMs > if (earliestDirtySegmentTimestamp < cleanUntilTime) > cleanUntilTime - earliestDirtySegmentTimestamp > else > 0L > }{code} > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-13723) max.compaction.lag.ms implemented incorrectly
[ https://issues.apache.org/jira/browse/KAFKA-13723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-13723. - Resolution: Not A Problem > max.compaction.lag.ms implemented incorrectly > - > > Key: KAFKA-13723 > URL: https://issues.apache.org/jira/browse/KAFKA-13723 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.3.0 >Reporter: Jun Rao >Priority: Major > > In https://issues.apache.org/jira/browse/KAFKA-7321, we introduced > max.compaction.lag.ms to guarantee that a record be cleaned before a certain > time. > > The implementation in LogCleanerManager has the following code. The path for > earliestDirtySegmentTimestamp < cleanUntilTime seems incorrect. In that case, > it seems that we should set the delay to 0 so that we could trigger cleaning > immediately since the segment has been dirty for longer than > max.compaction.lag.ms. > > > {code:java} > def maxCompactionDelay(log: UnifiedLog, firstDirtyOffset: Long, now: Long) : > Long = { > ... > val maxCompactionLagMs = math.max(log.config.maxCompactionLagMs, 0L) > val cleanUntilTime = now - maxCompactionLagMs > if (earliestDirtySegmentTimestamp < cleanUntilTime) > cleanUntilTime - earliestDirtySegmentTimestamp > else > 0L > }{code} > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] rhauch merged pull request #11872: KAFKA-12879: Remove extra sleep
rhauch merged pull request #11872: URL: https://github.com/apache/kafka/pull/11872 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch opened a new pull request #11872: KAFKA-12879: Remove extra sleep
rhauch opened a new pull request #11872: URL: https://github.com/apache/kafka/pull/11872 Tests didn't catch the extra sleep kept in #11871 after optimization ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13723) max.compaction.lag.ms implemented incorrectly
[ https://issues.apache.org/jira/browse/KAFKA-13723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503847#comment-17503847 ] xiongqi wu commented on KAFKA-13723: [~junrao] Hi Jun, this function is supposed to capture violation that pass-by the max compaction delay. e.g, if maxCompactionDelay > 0, which mean it has violated the policy (e.g, the log is not compacted within the maxCompaction config time), and the log should be compact immediately. if maxCompactionDelay = 0, not violation found, and the log doesn't need to be compacted immediately. The name is a little bit misleading. maxCompactionDelay doesn't mean log cleaner should delay util compaction. Instead, it means the delay already happened, and it should be cleaned immediately. > max.compaction.lag.ms implemented incorrectly > - > > Key: KAFKA-13723 > URL: https://issues.apache.org/jira/browse/KAFKA-13723 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.3.0 >Reporter: Jun Rao >Priority: Major > > In https://issues.apache.org/jira/browse/KAFKA-7321, we introduced > max.compaction.lag.ms to guarantee that a record be cleaned before a certain > time. > > The implementation in LogCleanerManager has the following code. The path for > earliestDirtySegmentTimestamp < cleanUntilTime seems incorrect. In that case, > it seems that we should set the delay to 0 so that we could trigger cleaning > immediately since the segment has been dirty for longer than > max.compaction.lag.ms. > > > {code:java} > def maxCompactionDelay(log: UnifiedLog, firstDirtyOffset: Long, now: Long) : > Long = { > ... > val maxCompactionLagMs = math.max(log.config.maxCompactionLagMs, 0L) > val cleanUntilTime = now - maxCompactionLagMs > if (earliestDirtySegmentTimestamp < cleanUntilTime) > cleanUntilTime - earliestDirtySegmentTimestamp > else > 0L > }{code} > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] rhauch merged pull request #11871: KAFKA-12879: Addendum to reduce flakiness of tests
rhauch merged pull request #11871: URL: https://github.com/apache/kafka/pull/11871 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lihaosky commented on a change in pull request #11802: [RFC][1/N]add new RocksDBTimeOrderedWindowStore
lihaosky commented on a change in pull request #11802: URL: https://github.com/apache/kafka/pull/11802#discussion_r823057006 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java ## @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.Arrays; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; + +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.kafka.streams.state.StateSerdes; + +import static org.apache.kafka.streams.state.StateSerdes.TIMESTAMP_SIZE; +import static org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize; + +public class PrefixedWindowKeySchemas { + +private static final int PREFIX_SIZE = 1; +private static final byte TIME_FIRST_PREFIX = 0; +private static final byte KEY_FIRST_PREFIX = 1; +private static final int SEQNUM_SIZE = 4; +private static final int SUFFIX_SIZE = TIMESTAMP_SIZE + SEQNUM_SIZE; + +private static byte extractPrefix(final byte[] binaryBytes) { +return binaryBytes[0]; +} + +public static class TimeFirstWindowKeySchema implements RocksDBSegmentedBytesStore.KeySchema { + +@Override +public Bytes upperRange(final Bytes key, final long to) { +if (key == null) { +// Put next prefix instead of null so that we can start from right prefix +// when scanning backwards +final byte nextPrefix = TIME_FIRST_PREFIX + 1; +return Bytes.wrap(ByteBuffer.allocate(PREFIX_SIZE).put(nextPrefix).array()); Review comment: Because we have ``` if (prefix != TIME_FIRST_PREFIX) { return false; } ``` in hasNextCondition. We need to start from correct prefix when scanning backward. If we put `to`, starting prefix when we scan backward could be `TIME_FIRST_PREFIX + 1` which would be wrong? ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java ## @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.Arrays; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.kstream.Window; +import org.apache.kafka.streams.kstream.Windowed; + +import java.nio.ByteBuffer; +import java.util.List; +import org.apache.kafka.streams.state.StateSerdes; + +import static org.apache.kafka.streams.state.StateSerdes.TIMESTAMP_SIZE; +import static org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize; + +public class PrefixedWindowKeySchemas { + +private static final int PREFIX_SIZE = 1; +private static final byte TIME_FIRST_PREFIX = 0; +private static final byte KEY_FIRST_PREFIX = 1; +private static final int SEQNUM_SIZE = 4; +private static final int SUFFIX_SIZE = TIMESTAMP_SIZE + SEQNUM_SIZE; + +private static byte extractPrefix(final byte[] binaryBytes) { +return binaryBytes[0]; +} + +public static class TimeFirstWindowKeySchema implements
[GitHub] [kafka] philipnee commented on pull request #11871: Addendum to KAFKA 12879 to fix flaky tests
philipnee commented on pull request #11871: URL: https://github.com/apache/kafka/pull/11871#issuecomment-1063324518 Note: See the commit https://github.com/apache/kafka/commit/28393be6d7416f51eb51f7fe2b075570b45ef09f -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee opened a new pull request #11871: Addendum to KAFKA 12879 to fix flaky tests
philipnee opened a new pull request #11871: URL: https://github.com/apache/kafka/pull/11871 This is an addendum to the KAFKA12879 to fix the flaky tests introduced in the pull request. - Add an if check to void sleep(0) - Increase timeout in the tests ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13723) max.compaction.lag.ms implemented incorrectly
[ https://issues.apache.org/jira/browse/KAFKA-13723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503813#comment-17503813 ] Jun Rao commented on KAFKA-13723: - [~xiongqiwu] and [~jjkoshy] : Could you check if this is a real issue? Thanks. > max.compaction.lag.ms implemented incorrectly > - > > Key: KAFKA-13723 > URL: https://issues.apache.org/jira/browse/KAFKA-13723 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.3.0 >Reporter: Jun Rao >Priority: Major > > In https://issues.apache.org/jira/browse/KAFKA-7321, we introduced > max.compaction.lag.ms to guarantee that a record be cleaned before a certain > time. > > The implementation in LogCleanerManager has the following code. The path for > earliestDirtySegmentTimestamp < cleanUntilTime seems incorrect. In that case, > it seems that we should set the delay to 0 so that we could trigger cleaning > immediately since the segment has been dirty for longer than > max.compaction.lag.ms. > > > {code:java} > def maxCompactionDelay(log: UnifiedLog, firstDirtyOffset: Long, now: Long) : > Long = { > ... > val maxCompactionLagMs = math.max(log.config.maxCompactionLagMs, 0L) > val cleanUntilTime = now - maxCompactionLagMs > if (earliestDirtySegmentTimestamp < cleanUntilTime) > cleanUntilTime - earliestDirtySegmentTimestamp > else > 0L > }{code} > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13723) max.compaction.lag.ms implemented incorrectly
Jun Rao created KAFKA-13723: --- Summary: max.compaction.lag.ms implemented incorrectly Key: KAFKA-13723 URL: https://issues.apache.org/jira/browse/KAFKA-13723 Project: Kafka Issue Type: Bug Components: core Affects Versions: 2.3.0 Reporter: Jun Rao In https://issues.apache.org/jira/browse/KAFKA-7321, we introduced max.compaction.lag.ms to guarantee that a record be cleaned before a certain time. The implementation in LogCleanerManager has the following code. The path for earliestDirtySegmentTimestamp < cleanUntilTime seems incorrect. In that case, it seems that we should set the delay to 0 so that we could trigger cleaning immediately since the segment has been dirty for longer than max.compaction.lag.ms. {code:java} def maxCompactionDelay(log: UnifiedLog, firstDirtyOffset: Long, now: Long) : Long = { ... val maxCompactionLagMs = math.max(log.config.maxCompactionLagMs, 0L) val cleanUntilTime = now - maxCompactionLagMs if (earliestDirtySegmentTimestamp < cleanUntilTime) cleanUntilTime - earliestDirtySegmentTimestamp else 0L }{code} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] guozhangwang commented on a change in pull request #11802: [RFC][1/N]add new RocksDBTimeOrderedWindowStore
guozhangwang commented on a change in pull request #11802: URL: https://github.com/apache/kafka/pull/11802#discussion_r822940645 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java ## @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.ProcessorContextUtils; +import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; +import org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; +import org.apache.kafka.streams.query.Position; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED; + +public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore implements SegmentedBytesStore { +private static final Logger LOG = LoggerFactory.getLogger(AbstractDualSchemaRocksDBSegmentedBytesStore.class); + +private final String name; +protected final AbstractSegments segments; +private final String metricScope; +protected final KeySchema baseKeySchema; +protected final Optional indexKeySchema; + + +private ProcessorContext context; +private StateStoreContext stateStoreContext; +private Sensor expiredRecordSensor; +private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP; +private boolean consistencyEnabled = false; +private Position position; +protected OffsetCheckpoint positionCheckpoint; +private volatile boolean open; + +AbstractDualSchemaRocksDBSegmentedBytesStore(final String name, + final String metricScope, + final KeySchema baseKeySchema, + final Optional indexKeySchema, + final AbstractSegments segments) { +this.name = name; +this.metricScope = metricScope; +this.baseKeySchema = baseKeySchema; +this.indexKeySchema = indexKeySchema; +this.segments = segments; +} + +@Override +public KeyValueIterator all() { +final List searchSpace = segments.allSegments(true); + +return new SegmentIterator<>( +searchSpace.iterator(), +baseKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE), +null, +null, +true); +} + +@Override +public KeyValueIterator backwardAll() { +final List searchSpace = segments.allSegments(false); + +return new SegmentIterator<>( +searchSpace.iterator(), +baseKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE), +null, +null, +false); +} + +@Override +public void remove(final Bytes rawKey) { +final long timestamp = baseKeySchema.segmentTimestamp(rawKey); +observedStreamTime = Math.max(observedStreamTime, timestamp); +final S segment = segments.getSegmentForTimestamp(timestamp); +if (segment == null) { +return; +} +segment.delete(rawKey); +} + +
[GitHub] [kafka] junrao commented on pull request #11811: (docs) Add JavaDocs for org.apache.kafka.common.security.oauthbearer.secured
junrao commented on pull request #11811: URL: https://github.com/apache/kafka/pull/11811#issuecomment-1063288699 @kirktrue : We could just make the changes to expose the proper public API and send an update to the original KIP-768 voting thread so that people are aware of the minor public changes if any. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] blcksrx commented on pull request #11838: KAFKA-1372: separate 400 error from 500 error in RestClient
blcksrx commented on pull request #11838: URL: https://github.com/apache/kafka/pull/11838#issuecomment-1063283897 @mimaison -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10690) Produce-response delay caused by lagging replica fetch which affects in-sync one
[ https://issues.apache.org/jira/browse/KAFKA-10690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503785#comment-17503785 ] Jun Rao commented on KAFKA-10690: - [~ocadaruma] : Thanks for filing the jira. Have you tried enabling replication throttling? This will help prevent the out-of-sync replicas from pulling data too aggressively. > Produce-response delay caused by lagging replica fetch which affects in-sync > one > > > Key: KAFKA-10690 > URL: https://issues.apache.org/jira/browse/KAFKA-10690 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.4.1 >Reporter: Haruki Okada >Priority: Major > Attachments: image-2020-11-06-11-15-21-781.png, > image-2020-11-06-11-15-38-390.png, image-2020-11-06-11-17-09-910.png > > > h2. Our environment > * Kafka version: 2.4.1 > h2. Phenomenon > * Produce response time 99th (remote scope) degrades to 500ms, which is 20 > times worse than usual > ** Meanwhile, the cluster was running replica reassignment to service-in new > machine to recover replicas which held by failed (Hardware issue) broker > machine > !image-2020-11-06-11-15-21-781.png|width=292,height=166! > h2. Analysis > Let's say > * broker-X: The broker we observed produce latency degradation > * broker-Y: The broker under servicing-in > broker-Y was catching up replicas of partitions: > * partition-A: has relatively small log size > * partition-B: has large log size > (actually, broker-Y was catching-up many other partitions. I noted only two > partitions here to make explanation simple) > broker-X was the leader for both partition-A and partition-B. > We found that both partition-A and partition-B are assigned to same > ReplicaFetcherThread of broker-Y, and produce latency started to degrade > right after broker-Y finished catching up partition-A. > !image-2020-11-06-11-17-09-910.png|width=476,height=174! > Besides, we observed disk reads on broker-X during service-in. (This is > natural since old segments are likely not in page cache) > !image-2020-11-06-11-15-38-390.png|width=292,height=193! > So we suspected that: > * In-sync replica fetch (partition-A) was involved by lagging replica fetch > (partition-B), which should be slow because it causes actual disk reads > ** Since ReplicaFetcherThread sends fetch requests in blocking manner, next > fetch request can't be sent until one fetch request completes > ** => Causes in-sync replica fetch for partitions assigned to same replica > fetcher thread to delay > ** => Causes remote scope produce latency degradation > h2. Possible fix > We think this issue can be addressed by designating part of > ReplicaFetcherThread (or creating another thread pool) for lagging replica > catching-up, but not so sure this is the appropriate way. > Please give your opinions about this issue. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] rhauch merged pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog
rhauch merged pull request #11797: URL: https://github.com/apache/kafka/pull/11797 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck edited a comment on pull request #11870: MINOR: jmh.sh swallows compile errors
bbejeck edited a comment on pull request #11870: URL: https://github.com/apache/kafka/pull/11870#issuecomment-1063212582 @ijuma - that was a while ago I can't remember exactly, but swelling compiler errors is not good. So I think we should go ahead with @lbradstreet's changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #11870: MINOR: jmh.sh swallows compile errors
bbejeck commented on pull request #11870: URL: https://github.com/apache/kafka/pull/11870#issuecomment-1063212582 @ijuma - that was a while ago I can't remember exactly, but I think we should go ahead with @lbradstreet's changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13722) Update internal interfaces that use ProcessorContext to use StateStoreContext instead
Guozhang Wang created KAFKA-13722: - Summary: Update internal interfaces that use ProcessorContext to use StateStoreContext instead Key: KAFKA-13722 URL: https://issues.apache.org/jira/browse/KAFKA-13722 Project: Kafka Issue Type: Improvement Reporter: Guozhang Wang This is a remainder that when we remove the deprecated public APIs that uses the ProcessorContext, like `StateStore.init`, we should also consider updating the internal interfaces with the ProcessorContext as well. That includes: 1. Segments and related util classes which use ProcessorContext. 2. For state stores that leverage on ProcessorContext.getXXXTime, their logic should be moved out of the state store impl but to the processor node level that calls on these state stores. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] guozhangwang commented on pull request #11802: [RFC][1/N]add new RocksDBTimeOrderedWindowStore
guozhangwang commented on pull request #11802: URL: https://github.com/apache/kafka/pull/11802#issuecomment-1063193852 @lihaosky there are some `streams:checkstyleTest` failures in jenkins which should be resolved. Could you take a look into this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #11811: (docs) Add JavaDocs for org.apache.kafka.common.security.oauthbearer.secured
kirktrue commented on pull request #11811: URL: https://github.com/apache/kafka/pull/11811#issuecomment-1063190001 No, all of the classes (public and non-public) are in the package together. Separating them out should be trivial from a code standpoint but please advise on what is needed to change that the proper way from a process standpoint (KIP, Jira, etc.). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue edited a comment on pull request #11838: KAFKA-1372: separate 400 error from 500 error in RestClient
kirktrue edited a comment on pull request #11838: URL: https://github.com/apache/kafka/pull/11838#issuecomment-1063188384 Thanks for the PR! It looks good to me. You need to get a committer to look at this in order to get it merged in. Perhaps a `git blame` can turn up someone who's worked on this that you can ping. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue commented on pull request #11838: KAFKA-1372: separate 400 error from 500 error in RestClient
kirktrue commented on pull request #11838: URL: https://github.com/apache/kafka/pull/11838#issuecomment-1063188384 You need to get a committer to look at this for merging. Perhaps a `git blame` can turn up someone who's worked on this that you can ping. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #11870: MINOR: jmh.sh swallows compile errors
ijuma commented on pull request #11870: URL: https://github.com/apache/kafka/pull/11870#issuecomment-1063182074 @bbejeck Do you recall why we did this originally? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13721) Left-join still emit spurious results in stream-stream joins in some cases
Nollet created KAFKA-13721: -- Summary: Left-join still emit spurious results in stream-stream joins in some cases Key: KAFKA-13721 URL: https://issues.apache.org/jira/browse/KAFKA-13721 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.1.0 Reporter: Nollet Stream-stream joins seems to still emit spurious results for some window configurations. >From my tests, it happened when setting before to 0 and having a grace period >smaller than the window duration. More precisely it seems to happen when >setting before and window duration > grace period + before h2. how to reproduce {code:java} import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestOutputTopic; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KStream; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.time.Duration; import java.time.Instant; import java.util.Properties; public class SpuriousLeftJoinTest { static final Duration WINDOW_DURATION = Duration.ofMinutes(10); static final Duration GRACE = Duration.ofMinutes(6); static final Duration BEFORE = Duration.ZERO; static final String LEFT_TOPIC_NAME = "LEFT_TOPIC"; static final String RIGHT_TOPIC_NAME = "RIGHT_TOPIC"; static final String OUTPUT_TOPIC_NAME = "OUTPUT_TOPIC"; private static TopologyTestDriver testDriver; private static TestInputTopic inputTopicLeft; private static TestInputTopic inputTopicRight; private static TestOutputTopic outputTopic; public static Topology createTopology() { StreamsBuilder builder = new StreamsBuilder(); KStream leftStream = builder.stream(LEFT_TOPIC_NAME); KStream rightStream = builder.stream(RIGHT_TOPIC_NAME); // return 1 if left join matched, otherwise 0 KStream joined = leftStream.leftJoin( rightStream, (value1, value2) -> { if(value2 == null){ return 0; } return 1; }, JoinWindows.ofTimeDifferenceAndGrace(WINDOW_DURATION, GRACE) .before(BEFORE) ); joined.to(OUTPUT_TOPIC_NAME); return builder.build(); } @Before public void setup() { Topology topology = createTopology(); Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class); testDriver = new TopologyTestDriver(topology, props); inputTopicLeft = testDriver.createInputTopic(LEFT_TOPIC_NAME, Serdes.String().serializer(), Serdes.Integer().serializer()); inputTopicRight = testDriver.createInputTopic(RIGHT_TOPIC_NAME, Serdes.String().serializer(), Serdes.Integer().serializer()); outputTopic = testDriver.createOutputTopic(OUTPUT_TOPIC_NAME, Serdes.String().deserializer(), Serdes.Integer().deserializer()); } @After public void tearDown() { testDriver.close(); } @Test public void shouldEmitOnlyOneMessageForKey1(){ Instant now = Instant.now(); inputTopicLeft.pipeInput("key1", 12, now); inputTopicRight.pipeInput("key1", 13, now.plus(WINDOW_DURATION)); // send later record to increase stream time & close the window inputTopicLeft.pipeInput("other_key", 1212122, now.plus(WINDOW_DURATION).plus(GRACE).plusSeconds(10)); while (! outputTopic.isEmpty()){ System.out.println(outputTopic.readKeyValue()); } } } {code} Stdout of previous code is {noformat} KeyValue(key1, 0) KeyValue(key1, 1) {noformat} However it should be {noformat} KeyValue(key1, 1) {noformat} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] lbradstreet opened a new pull request #11870: MINOR: jmh.sh swallows compile errors
lbradstreet opened a new pull request #11870: URL: https://github.com/apache/kafka/pull/11870 jmh.sh runs tasks in quiet mode which swallows compiler errors. This is a pain and I frequently have to edit the shell script to see the error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-8065) Forwarding modified timestamps does not reset timestamp correctly
[ https://issues.apache.org/jira/browse/KAFKA-8065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josep Prat reassigned KAFKA-8065: - Assignee: (was: Josep Prat) > Forwarding modified timestamps does not reset timestamp correctly > - > > Key: KAFKA-8065 > URL: https://issues.apache.org/jira/browse/KAFKA-8065 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1, 2.2.0, 2.1.1 >Reporter: Matthias J. Sax >Priority: Major > Fix For: 2.2.0, 2.0.2, 2.1.2 > > > Using Processor API, users can set a new output record timestamp via > `context.forward(..., To.all().withTimestamp(...))`. However, after the > forward()-call returns, the timestamp is not reset to the original input > record timestamp and thus a consecutive call to `context.forward(...)` > without `To` will use the newly set output record timestamp from before, too. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Assigned] (KAFKA-8065) Forwarding modified timestamps does not reset timestamp correctly
[ https://issues.apache.org/jira/browse/KAFKA-8065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josep Prat reassigned KAFKA-8065: - Assignee: Josep Prat (was: Matthias J. Sax) > Forwarding modified timestamps does not reset timestamp correctly > - > > Key: KAFKA-8065 > URL: https://issues.apache.org/jira/browse/KAFKA-8065 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1, 2.2.0, 2.1.1 >Reporter: Matthias J. Sax >Assignee: Josep Prat >Priority: Major > Fix For: 2.2.0, 2.0.2, 2.1.2 > > > Using Processor API, users can set a new output record timestamp via > `context.forward(..., To.all().withTimestamp(...))`. However, after the > forward()-call returns, the timestamp is not reset to the original input > record timestamp and thus a consecutive call to `context.forward(...)` > without `To` will use the newly set output record timestamp from before, too. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] dajac commented on a change in pull request #11864: KAFKA-13717: skip coordinator lookup in commitOffsetsAsync if offsets is empty
dajac commented on a change in pull request #11864: URL: https://github.com/apache/kafka/pull/11864#discussion_r822676784 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -1000,8 +1000,11 @@ void invokeCompletedOffsetCommitCallbacks() { public RequestFuture commitOffsetsAsync(final Map offsets, final OffsetCommitCallback callback) { invokeCompletedOffsetCommitCallbacks(); -RequestFuture future = null; -if (!coordinatorUnknown()) { +RequestFuture future = null; +if (offsets.isEmpty()) { +// No need to check coordinator if offsets is empty since commit of empty offsets is completed locally. +future = doCommitOffsetsAsync(offsets, callback); +} else if (!coordinatorUnknown()) { future = doCommitOffsetsAsync(offsets, callback); Review comment: There is already a comment for that branch. Don't you think that this is enough? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock
ijuma merged pull request #11722: URL: https://github.com/apache/kafka/pull/11722 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock
ijuma commented on a change in pull request #11722: URL: https://github.com/apache/kafka/pull/11722#discussion_r822653150 ## File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ## @@ -448,40 +448,48 @@ public ReadyCheckResult ready(Cluster cluster, long nowMs) { boolean exhausted = this.free.queued() > 0; for (Map.Entry> entry : this.batches.entrySet()) { Deque deque = entry.getValue(); + +final ProducerBatch batch; +final long waitedTimeMs; +final boolean backingOff; +final boolean full; + +// Collect as little as possible inside critical region, determine outcome after release synchronized (deque) { -// When producing to a large number of partitions, this path is hot and deques are often empty. -// We check whether a batch exists first to avoid the more expensive checks whenever possible. -ProducerBatch batch = deque.peekFirst(); -if (batch != null) { -TopicPartition part = entry.getKey(); -Node leader = cluster.leaderFor(part); -if (leader == null) { -// This is a partition for which leader is not known, but messages are available to send. -// Note that entries are currently not removed from batches when deque is empty. -unknownLeaderTopics.add(part.topic()); -} else if (!readyNodes.contains(leader) && !isMuted(part)) { -long waitedTimeMs = batch.waitedTimeMs(nowMs); -boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs; -long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; -boolean full = deque.size() > 1 || batch.isFull(); -boolean expired = waitedTimeMs >= timeToWaitMs; -boolean transactionCompleting = transactionManager != null && transactionManager.isCompleting(); -boolean sendable = full -|| expired -|| exhausted -|| closed -|| flushInProgress() -|| transactionCompleting; -if (sendable && !backingOff) { -readyNodes.add(leader); -} else { -long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); -// Note that this results in a conservative estimate since an un-sendable partition may have -// a leader that will later be found to have sendable data. However, this is good enough -// since we'll just wake up and then sleep again for the remaining time. -nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); -} -} +batch = deque.peekFirst(); Review comment: Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11864: KAFKA-13717: skip coordinator lookup in commitOffsetsAsync if offsets is empty
showuon commented on a change in pull request #11864: URL: https://github.com/apache/kafka/pull/11864#discussion_r822634915 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -1000,8 +1000,11 @@ void invokeCompletedOffsetCommitCallbacks() { public RequestFuture commitOffsetsAsync(final Map offsets, final OffsetCommitCallback callback) { invokeCompletedOffsetCommitCallbacks(); -RequestFuture future = null; -if (!coordinatorUnknown()) { +RequestFuture future = null; +if (offsets.isEmpty()) { +// No need to check coordinator if offsets is empty since commit of empty offsets is completed locally. +future = doCommitOffsetsAsync(offsets, callback); +} else if (!coordinatorUnknown()) { future = doCommitOffsetsAsync(offsets, callback); Review comment: I'm wonder if other dev would think this is a mistake when seeing this. Or at least we should leave a comment to mention this is on purpose. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10690) Produce-response delay caused by lagging replica fetch which affects in-sync one
[ https://issues.apache.org/jira/browse/KAFKA-10690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503574#comment-17503574 ] Luke Chen commented on KAFKA-10690: --- [~ocadaruma] , thanks for resporing. One question: Are you sure this issue is due to the `in-sync` replica fetch? Could you have a PoC to add an additional thread pool for lagging replica to confirm this solution? Thank you. > Produce-response delay caused by lagging replica fetch which affects in-sync > one > > > Key: KAFKA-10690 > URL: https://issues.apache.org/jira/browse/KAFKA-10690 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.4.1 >Reporter: Haruki Okada >Priority: Major > Attachments: image-2020-11-06-11-15-21-781.png, > image-2020-11-06-11-15-38-390.png, image-2020-11-06-11-17-09-910.png > > > h2. Our environment > * Kafka version: 2.4.1 > h2. Phenomenon > * Produce response time 99th (remote scope) degrades to 500ms, which is 20 > times worse than usual > ** Meanwhile, the cluster was running replica reassignment to service-in new > machine to recover replicas which held by failed (Hardware issue) broker > machine > !image-2020-11-06-11-15-21-781.png|width=292,height=166! > h2. Analysis > Let's say > * broker-X: The broker we observed produce latency degradation > * broker-Y: The broker under servicing-in > broker-Y was catching up replicas of partitions: > * partition-A: has relatively small log size > * partition-B: has large log size > (actually, broker-Y was catching-up many other partitions. I noted only two > partitions here to make explanation simple) > broker-X was the leader for both partition-A and partition-B. > We found that both partition-A and partition-B are assigned to same > ReplicaFetcherThread of broker-Y, and produce latency started to degrade > right after broker-Y finished catching up partition-A. > !image-2020-11-06-11-17-09-910.png|width=476,height=174! > Besides, we observed disk reads on broker-X during service-in. (This is > natural since old segments are likely not in page cache) > !image-2020-11-06-11-15-38-390.png|width=292,height=193! > So we suspected that: > * In-sync replica fetch (partition-A) was involved by lagging replica fetch > (partition-B), which should be slow because it causes actual disk reads > ** Since ReplicaFetcherThread sends fetch requests in blocking manner, next > fetch request can't be sent until one fetch request completes > ** => Causes in-sync replica fetch for partitions assigned to same replica > fetcher thread to delay > ** => Causes remote scope produce latency degradation > h2. Possible fix > We think this issue can be addressed by designating part of > ReplicaFetcherThread (or creating another thread pool) for lagging replica > catching-up, but not so sure this is the appropriate way. > Please give your opinions about this issue. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] dajac commented on a change in pull request #11864: KAFKA-13717: skip coordinator lookup in commitOffsetsAsync if offsets is empty
dajac commented on a change in pull request #11864: URL: https://github.com/apache/kafka/pull/11864#discussion_r822622560 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -1000,8 +1000,11 @@ void invokeCompletedOffsetCommitCallbacks() { public RequestFuture commitOffsetsAsync(final Map offsets, final OffsetCommitCallback callback) { invokeCompletedOffsetCommitCallbacks(); -RequestFuture future = null; -if (!coordinatorUnknown()) { +RequestFuture future = null; +if (offsets.isEmpty()) { +// No need to check coordinator if offsets is empty since commit of empty offsets is completed locally. +future = doCommitOffsetsAsync(offsets, callback); +} else if (!coordinatorUnknown()) { future = doCommitOffsetsAsync(offsets, callback); Review comment: I considered this but I think that it is clearer when kept separated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #11864: KAFKA-13717: skip coordinator lookup in commitOffsetsAsync if offsets is empty
showuon commented on a change in pull request #11864: URL: https://github.com/apache/kafka/pull/11864#discussion_r822620734 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ## @@ -1000,8 +1000,11 @@ void invokeCompletedOffsetCommitCallbacks() { public RequestFuture commitOffsetsAsync(final Map offsets, final OffsetCommitCallback callback) { invokeCompletedOffsetCommitCallbacks(); -RequestFuture future = null; -if (!coordinatorUnknown()) { +RequestFuture future = null; +if (offsets.isEmpty()) { +// No need to check coordinator if offsets is empty since commit of empty offsets is completed locally. +future = doCommitOffsetsAsync(offsets, callback); +} else if (!coordinatorUnknown()) { future = doCommitOffsetsAsync(offsets, callback); Review comment: Could we do this and add the comment above? ```java // No need to check coordinator if offsets is empty since commit of empty offsets is completed locally. if (offsets.isEmpty() || !coordinatorUnknown()) { future = doCommitOffsetsAsync(offsets, callback); } ``` ## File path: core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ## @@ -2483,6 +2483,20 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } } + + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testCreateAndCloseConsumerWithNoAccess(quorum: String): Unit = { +val consumer = createConsumer() +try { + // Close consumer without consuming anything. close() call should pass successfully and throw no exception. + consumer.close() +} catch { + case e: Throwable => +fail(s"Exception not expected on closing consumer: $e") +} Review comment: Could we use `assertDoesNotThrow` instead? I.e.: ```java assertDoesNotThrow(() => consumer.close(), s"Exception not expected on closing consumer) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] soarez commented on pull request #9577: KAFKA-9837: KIP-589 new RPC for notifying controller log dir failure
soarez commented on pull request #9577: URL: https://github.com/apache/kafka/pull/9577#issuecomment-1062889765 @mumrah @hachikuji @bbejeck can anyone review this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abdulshaikh76 commented on pull request #8831: KAFKA-8657:Client-side automatic topic creation on Producer
abdulshaikh76 commented on pull request #8831: URL: https://github.com/apache/kafka/pull/8831#issuecomment-1062882178 is this changes available ? In which version of kafka-client jar ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sunshujie1990 commented on pull request #11869: KAFKA-13719: fix connector restart cause duplicate tasks
sunshujie1990 commented on pull request #11869: URL: https://github.com/apache/kafka/pull/11869#issuecomment-1062864985 @hachikuji Jason, please help to review it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13720) Few topic partitions remain under replicated after broker lose connectivity to zookeeper
Dhirendra Singh created KAFKA-13720: --- Summary: Few topic partitions remain under replicated after broker lose connectivity to zookeeper Key: KAFKA-13720 URL: https://issues.apache.org/jira/browse/KAFKA-13720 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 2.7.1 Reporter: Dhirendra Singh Few topic partitions remain under replicated after broker lose connectivity to zookeeper. It only happens when brokers lose connectivity to zookeeper and it results in change in active controller. Issue does not occur always but randomly. Issue never occurs when there is no change in active controller when brokers lose connectivity to zookeeper. Following error message i found in the log file. [2022-02-28 04:01:20,217] WARN [Partition __consumer_offsets-4 broker=1] Controller failed to update ISR to PendingExpandIsr(isr=Set(1), newInSyncReplicaId=2) due to unexpected UNKNOWN_SERVER_ERROR. Retrying. (kafka.cluster.Partition) [2022-02-28 04:01:20,217] ERROR [broker-1-to-controller] Uncaught error in request completion: (org.apache.kafka.clients.NetworkClient) java.lang.IllegalStateException: Failed to enqueue `AlterIsr` request with state LeaderAndIsr(leader=1, leaderEpoch=2728, isr=List(1, 2), zkVersion=4719) for partition __consumer_offsets-4 at kafka.cluster.Partition.sendAlterIsrRequest(Partition.scala:1403) at kafka.cluster.Partition.$anonfun$handleAlterIsrResponse$1(Partition.scala:1438) at kafka.cluster.Partition.handleAlterIsrResponse(Partition.scala:1417) at kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1(Partition.scala:1398) at kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1$adapted(Partition.scala:1398) at kafka.server.AlterIsrManagerImpl.$anonfun$handleAlterIsrResponse$8(AlterIsrManager.scala:166) at kafka.server.AlterIsrManagerImpl.$anonfun$handleAlterIsrResponse$8$adapted(AlterIsrManager.scala:163) at scala.collection.immutable.List.foreach(List.scala:333) at kafka.server.AlterIsrManagerImpl.handleAlterIsrResponse(AlterIsrManager.scala:163) at kafka.server.AlterIsrManagerImpl.responseHandler$1(AlterIsrManager.scala:94) at kafka.server.AlterIsrManagerImpl.$anonfun$sendRequest$2(AlterIsrManager.scala:104) at kafka.server.BrokerToControllerRequestThread.handleResponse(BrokerToControllerChannelManagerImpl.scala:175) at kafka.server.BrokerToControllerRequestThread.$anonfun$generateRequests$1(BrokerToControllerChannelManagerImpl.scala:158) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:586) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:578) at kafka.common.InterBrokerSendThread.doWork(InterBrokerSendThread.scala:71) at kafka.server.BrokerToControllerRequestThread.doWork(BrokerToControllerChannelManagerImpl.scala:183) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) under replication count goes to zero after the controller broker is restarted again. but this require manual intervention. Expectation is that when broker reconnect with zookeeper cluster should come back to stable state with under replication count as zero by itself without any manual intervention. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] (KAFKA-13719) connector restart cause duplicate tasks
[ https://issues.apache.org/jira/browse/KAFKA-13719 ] Shujie Sun deleted comment on KAFKA-13719: was (Author: sunshujie): https://github.com/apache/kafka/pull/11869 > connector restart cause duplicate tasks > --- > > Key: KAFKA-13719 > URL: https://issues.apache.org/jira/browse/KAFKA-13719 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.0.0 >Reporter: Shujie Sun >Priority: Critical > > Restart connector with parameter includeTasks=true=false cause > duplicate tasks and duplicate message。 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13719) connector restart cause duplicate tasks
[ https://issues.apache.org/jira/browse/KAFKA-13719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503502#comment-17503502 ] Shujie Sun commented on KAFKA-13719: https://github.com/apache/kafka/pull/11869 > connector restart cause duplicate tasks > --- > > Key: KAFKA-13719 > URL: https://issues.apache.org/jira/browse/KAFKA-13719 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.0.0 >Reporter: Shujie Sun >Priority: Critical > > Restart connector with parameter includeTasks=true=false cause > duplicate tasks and duplicate message。 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13719) connector restart cause duplicate tasks
[ https://issues.apache.org/jira/browse/KAFKA-13719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shujie Sun updated KAFKA-13719: --- Attachment: (was: image-2022-03-09-18-57-09-467.png) > connector restart cause duplicate tasks > --- > > Key: KAFKA-13719 > URL: https://issues.apache.org/jira/browse/KAFKA-13719 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.0.0 >Reporter: Shujie Sun >Priority: Critical > > Restart connector with parameter includeTasks=true=false cause > duplicate tasks and duplicate message。 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] sunshujie1990 opened a new pull request #11869: KAFKA-13719: fix connector restart cause duplicate tasks
sunshujie1990 opened a new pull request #11869: URL: https://github.com/apache/kafka/pull/11869 When kafka Connect restarts connector with includeTasks=true, DistributedHerder start all task without filter the currentAssignments. This results in duplicate tasks and duplicate data。 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13719) connector restart cause duplicate tasks
Shujie Sun created KAFKA-13719: -- Summary: connector restart cause duplicate tasks Key: KAFKA-13719 URL: https://issues.apache.org/jira/browse/KAFKA-13719 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 3.0.0 Reporter: Shujie Sun Attachments: image-2022-03-09-18-57-09-467.png Restart connector with parameter includeTasks=true=false cause duplicate tasks and duplicate message。 -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13699) ProcessorContext does not expose Stream Time
[ https://issues.apache.org/jira/browse/KAFKA-13699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503493#comment-17503493 ] Mickael Maison commented on KAFKA-13699: Even if it was voted and partly landed in 3.0.0, I think we should only the missing APIs in 3.2.0. > ProcessorContext does not expose Stream Time > > > Key: KAFKA-13699 > URL: https://issues.apache.org/jira/browse/KAFKA-13699 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0 >Reporter: Shay Lin >Priority: Major > Labels: newbie > > As a KS developer, I would like to leverage > [KIP-622|https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext] > and access stream time in Processor Context. > _(Updated)_ > However, the methods currentStreamTimeMs or currentSystemTimeMs is missing > from for KStreams 3.0+. > Checked with [~mjsax] , the methods are absent from the Processor API , i.e. > * org.apache.kafka.streams.processor.api.ProcessorContext -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] mimaison commented on pull request #11817: KAFKA-13438: Replace EasyMock and PowerMock with Mockito in WorkerTest
mimaison commented on pull request #11817: URL: https://github.com/apache/kafka/pull/11817#issuecomment-1062783063 Thanks for the update. There's an unused import that is failing [checkstyle](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11817/3/pipeline): ``` [ant:checkstyle] [ERROR] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-11817/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/BaseConnectorClientConfigOverridePolicyTest.java:23:8: Unused import - org.apache.kafka.connect.runtime.WorkerTest. [UnusedImports] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning
ableegoldman opened a new pull request #11868: URL: https://github.com/apache/kafka/pull/11868 This test has started to become flaky at a relatively low, but consistently reproducible, rate. Upon inspection, we find this is due to IOExceptions during the #cleanUpNamedTopology call -- specifically, most often a `DirectoryNotEmptyException` with an ocasional` FileNotFoundException` Basically, signs pointed to having returned from/completed the `#removeNamedTopology` future prematurely, and moving on to try and clear out the topology's state directory while there was a streamthread somewhere that was continuing to process/close its tasks. I believe this is due to updating the thread's topology version _before_ we perform the actual topology update, in this case specifically the act of eg clearing out a directory. If one thread updates its version and then goes to perform the topology removal/cleanup when the second thread finishes its own topology removal, this other thread will check whether all threads are on the latest version and complete any waiting futures if so -- which means it can complete the future before the first thread has actually completed the corresponding action -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org