[GitHub] [kafka] lkokhreidze commented on pull request #10802: KAFKA-6718 / Update SubscriptionInfoData with clientTags

2022-03-09 Thread GitBox


lkokhreidze commented on pull request #10802:
URL: https://github.com/apache/kafka/pull/10802#issuecomment-1063755741


   @cadonna done!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning

2022-03-09 Thread GitBox


ableegoldman commented on a change in pull request #11868:
URL: https://github.com/apache/kafka/pull/11868#discussion_r823421770



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/RemoveNamedTopologyResult.java
##
@@ -52,34 +60,54 @@ public RemoveNamedTopologyResult(final KafkaFuture 
removeTopologyFuture) {
  * successfully completed their corresponding {@link KafkaFuture}.
  */
 public final KafkaFuture all() {
-final KafkaFutureImpl result = new KafkaFutureImpl<>();
+if (resetOffsetsFuture == null) {
+return removeTopologyFuture;
+} else {
+return resetOffsetsFuture;

Review comment:
   Basically we now have the caller thread perform the offset reset and 
block on it when it goes to call `get()` on the future returned by 
`RemoveNamedTopologyResult#all` (or `#resetOffsetsFuture`) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning

2022-03-09 Thread GitBox


ableegoldman commented on a change in pull request #11868:
URL: https://github.com/apache/kafka/pull/11868#discussion_r823421009



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##
@@ -264,48 +267,46 @@ private boolean maybeCompleteFutureIfStillInCREATED(final 
KafkaFutureImpl
 }
 }
 
-private RemoveNamedTopologyResult resetOffsets(final KafkaFutureImpl 
removeTopologyFuture,
-   final Set 
partitionsToReset) {
-if (!partitionsToReset.isEmpty()) {
-removeTopologyFuture.whenComplete((v, throwable) -> {
-if (throwable != null) {
-removeTopologyFuture.completeExceptionally(throwable);
-}
-DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null;
-while (deleteOffsetsResult == null) {
-try {
-deleteOffsetsResult = 
adminClient.deleteConsumerGroupOffsets(
-
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), 
partitionsToReset);
-deleteOffsetsResult.all().get();
-} catch (final InterruptedException ex) {
-ex.printStackTrace();
-break;
-} catch (final ExecutionException ex) {
-if (ex.getCause() != null &&
-ex.getCause() instanceof 
GroupSubscribedToTopicException &&
-ex.getCause()
-.getMessage()
-.equals("Deleting offsets of a topic is 
forbidden while the consumer group is actively subscribed to it.")) {
-ex.printStackTrace();
-} else if (ex.getCause() != null &&
-ex.getCause() instanceof GroupIdNotFoundException) 
{
-log.debug("The offsets have been reset by another 
client or the group has been deleted, no need to retry further.");
-break;
-} else {
-removeTopologyFuture.completeExceptionally(ex);
-}
-deleteOffsetsResult = null;
-}
-try {
-Thread.sleep(100);
-} catch (final InterruptedException ex) {
-ex.printStackTrace();
+private void resetOffsets(final Set partitionsToReset) 
throws StreamsException {

Review comment:
   Sorry for the large diff -- it's mainly due to spacing from having moved 
the `1!partitionsToReset.isEmpty()`, plus one small stylistic change to  use a 
`while true` loop with `break`s because following the null status of the 
`deleteOffsetsResult` was a bit confusing.
   
   The real change though is that this method now just performs the offset 
resets directly, rather than directing whoever completes the 
`removeNamedTopology` future to perform the offset reset (which is non-trivial 
and thus not appropriate for the StreamThreads to do).
   
   We now invoke this directly when the user calls `get()` on the future 
returned from the RemoveNamedTopologyResult.
   
   This is the main change since being approved @wcarlson5  @guozhangwang 
   
   There's also the 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #10802: KAFKA-6718 / Update SubscriptionInfoData with clientTags

2022-03-09 Thread GitBox


cadonna commented on pull request #10802:
URL: https://github.com/apache/kafka/pull/10802#issuecomment-1063745684


   @lkokhreidze Could you please rebase this PR on latest trunk? Some system 
tests fail probably due to the absence of a fix on the PR branch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning

2022-03-09 Thread GitBox


ableegoldman commented on a change in pull request #11868:
URL: https://github.com/apache/kafka/pull/11868#discussion_r823417406



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##
@@ -239,12 +238,16 @@ public RemoveNamedTopologyResult 
removeNamedTopology(final String topologyToRemo
 final boolean skipResetForUnstartedApplication =
 maybeCompleteFutureIfStillInCREATED(removeTopologyFuture, 
"removing topology " + topologyToRemove);
 
-if (resetOffsets && !skipResetForUnstartedApplication) {
+if (resetOffsets && !skipResetForUnstartedApplication && 
!partitionsToReset.isEmpty()) {

Review comment:
   Moved the `!partitionsToReset.isEmpty()` check here to make sure we 
don't log the line about resetting offsets if we don't actually have any 
offsets to reset




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RivenSun2 commented on pull request #11819: MINOR: Optimize the generateFieldToString method of MessageDataGenerator

2022-03-09 Thread GitBox


RivenSun2 commented on pull request #11819:
URL: https://github.com/apache/kafka/pull/11819#issuecomment-1063741527


   Hi @dajac @cmccabe 
   Could you help to review the PR ?
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13722) Update internal interfaces that use ProcessorContext to use StateStoreContext instead

2022-03-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13722:

Component/s: streams

> Update internal interfaces that use ProcessorContext to use StateStoreContext 
> instead
> -
>
> Key: KAFKA-13722
> URL: https://issues.apache.org/jira/browse/KAFKA-13722
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>
> This is a remainder that when we remove the deprecated public APIs that uses 
> the ProcessorContext, like `StateStore.init`, we should also consider 
> updating the internal interfaces with the ProcessorContext as well. That 
> includes:
> 1. Segments and related util classes which use ProcessorContext.
> 2. For state stores that leverage on ProcessorContext.getXXXTime, their logic 
> should be moved out of the state store impl but to the processor node level 
> that calls on these state stores.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (KAFKA-8065) Forwarding modified timestamps does not reset timestamp correctly

2022-03-09 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-8065:
--

Assignee: Matthias J. Sax

> Forwarding modified timestamps does not reset timestamp correctly
> -
>
> Key: KAFKA-8065
> URL: https://issues.apache.org/jira/browse/KAFKA-8065
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1, 2.2.0, 2.1.1
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
> Fix For: 2.2.0, 2.0.2, 2.1.2
>
>
> Using Processor API, users can set a new output record timestamp via 
> `context.forward(..., To.all().withTimestamp(...))`. However, after the 
> forward()-call returns, the timestamp is not reset to the original input 
> record timestamp and thus a consecutive call to `context.forward(...)` 
> without `To` will use the newly set output record timestamp from before, too.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] guozhangwang commented on pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning

2022-03-09 Thread GitBox


guozhangwang commented on pull request #11868:
URL: https://github.com/apache/kafka/pull/11868#issuecomment-1063695247


   > Anyways I'll push a fix for this, and then we should be good to go 
   
   Thanks @wcarlson5 for the report! @ableegoldman please feel free to move on 
afterwards.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] peterwanner commented on pull request #11855: MINOR: Clarify producer batch.size in upgrade docs

2022-03-09 Thread GitBox


peterwanner commented on pull request #11855:
URL: https://github.com/apache/kafka/pull/11855#issuecomment-1063660862


   > @peterwanner , since it's reaching the end of the release. Please let me 
know if you can't work on it before next weekend. Thank you.
   
   @showuon  ok  I will finish in the afternoon, please wait for a seconds. 
Thank you so much  for your reminder


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning

2022-03-09 Thread GitBox


ableegoldman commented on pull request #11868:
URL: https://github.com/apache/kafka/pull/11868#issuecomment-1063651734


   Ah good catch @wcarlson5 , I need to move the offset reset logic out of the 
actual `removeNamedTopology` call and make sure we don't block on the offsets 
being removed until the `get()` -- otherwise we get deadlock if we have two 
Streams clients and try to remove a named topology from both from a single 
thread. (To clarify for anyone else, this is not really a realistic/recommended 
usage pattern for real applications, but it helps keep the tests simple and 
makes for a more intuitive blocking behavior anyways)
   
   Anyways I'll push a fix for this, and then we should be good to go  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning

2022-03-09 Thread GitBox


wcarlson5 commented on pull request #11868:
URL: https://github.com/apache/kafka/pull/11868#issuecomment-1063618150


   
`shouldAllowRemovingAndAddingNamedTopologyToRunningApplicationWithMultipleNodesAndResetsOffsets`
 is deadlocking. We need to be able to have its be async a little or else it 
can't make progress


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning

2022-03-09 Thread GitBox


guozhangwang commented on a change in pull request #11868:
URL: https://github.com/apache/kafka/pull/11868#discussion_r823308917



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##
@@ -266,46 +266,57 @@ private boolean maybeCompleteFutureIfStillInCREATED(final 
KafkaFutureImpl
 
 private RemoveNamedTopologyResult resetOffsets(final KafkaFutureImpl 
removeTopologyFuture,
final Set 
partitionsToReset) {
-if (!partitionsToReset.isEmpty()) {
-removeTopologyFuture.whenComplete((v, throwable) -> {
-if (throwable != null) {
-removeTopologyFuture.completeExceptionally(throwable);
-}
-DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null;
-while (deleteOffsetsResult == null) {
-try {
-deleteOffsetsResult = 
adminClient.deleteConsumerGroupOffsets(
-
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), 
partitionsToReset);
-deleteOffsetsResult.all().get();
-} catch (final InterruptedException ex) {
-ex.printStackTrace();
+final KafkaFutureImpl resetOffsetsFuture = new 
KafkaFutureImpl<>();
+try {
+removeTopologyFuture.get();

Review comment:
   Got it, that makes a lot of sense, thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon merged pull request #11863: KAFKA-13689: Revert AbstractConfig code changes

2022-03-09 Thread GitBox


showuon merged pull request #11863:
URL: https://github.com/apache/kafka/pull/11863


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13717) KafkaConsumer.close throws authorization exception even when commit offsets is empty

2022-03-09 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13717?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-13717.
---
Fix Version/s: 3.2.0
 Assignee: Vincent Jiang
   Resolution: Fixed

> KafkaConsumer.close throws authorization exception even when commit offsets 
> is empty
> 
>
> Key: KAFKA-13717
> URL: https://issues.apache.org/jira/browse/KAFKA-13717
> Project: Kafka
>  Issue Type: Bug
>  Components: unit tests
>Reporter: Vincent Jiang
>Assignee: Vincent Jiang
>Priority: Major
> Fix For: 3.2.0
>
>
> When offsets is empty and coordinator is unknown, KafkaConsumer.close doesn't 
> throw exception before commit 
> [https://github.com/apache/kafka/commit/4b468a9d81f7380f7197a2a6b859c1b4dca84bd9|https://github.com/apache/kafka/commit/4b468a9d81f7380f7197a2a6b859c1b4dca84bd9,].
>   After this commit, Kafka.close may throw authorization exception.
>  
> Root cause is because in the commit, the logic is changed to call 
> lookupCoordinator even if offsets is empty. 
>  
> Even if a consumer doesn't have access to a group or a topic, it might be 
> better to not throw authorization exception in this case because close() call 
> doesn't touch actually access any resource.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon commented on pull request #11855: MINOR: Clarify producer batch.size in upgrade docs

2022-03-09 Thread GitBox


showuon commented on pull request #11855:
URL: https://github.com/apache/kafka/pull/11855#issuecomment-1063601218


   @peterwanner , since it's reaching the end of the release. Please let me 
know if you can't work on it before next weekend. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon merged pull request #11864: KAFKA-13717: skip coordinator lookup in commitOffsetsAsync if offsets is empty

2022-03-09 Thread GitBox


showuon merged pull request #11864:
URL: https://github.com/apache/kafka/pull/11864


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11864: KAFKA-13717: skip coordinator lookup in commitOffsetsAsync if offsets is empty

2022-03-09 Thread GitBox


showuon commented on pull request #11864:
URL: https://github.com/apache/kafka/pull/11864#issuecomment-1063598542


   Failed tests are unrelated.
   ```
   Build / PowerPC / kafka.network.SocketServerTest.testIdleConnection()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testOffsetSyncsTopicsOnTarget()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTest.testReplication()
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector
   Build / JDK 11 and Scala 2.13 / 
kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, 
Name=testPreferredReplicaElection, Security=PLAINTEXT
   Build / JDK 17 and Scala 2.13 / kafka.api.ConsumerBounceTest.testClose()
   Build / JDK 11 and Scala 2.13 / 
kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()
   Build / JDK 11 and Scala 2.13 / 
kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()
   Build / JDK 17 and Scala 2.13 / 
kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()
   Build / JDK 17 and Scala 2.13 / 
kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10160) Kafka MM2 consumer configuration

2022-03-09 Thread Harsha Madduri (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10160?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503939#comment-17503939
 ] 

Harsha Madduri commented on KAFKA-10160:


Is there a reason we are holding this from going into trunk? [~ryannedolan] , 
[~sbellapu] 

> Kafka MM2 consumer configuration
> 
>
> Key: KAFKA-10160
> URL: https://issues.apache.org/jira/browse/KAFKA-10160
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.5.0, 2.4.1
>Reporter: Pavol Ipoth
>Assignee: sats
>Priority: Major
>  Labels: configuration, kafka, mirror-maker
> Fix For: 2.4.2, 2.8.0
>
>
> [https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java#L51,]
>  according this producer/consumer level properties should be configured as 
> e.g. somesource->sometarget.consumer.client.id, i try to set 
> somesource->sometarget.consumer.auto.offset.reset=latest, but without 
> success, consumer always tries to fetch earliest, not sure if bug or my 
> misconfiguration, but then at least some update to docu would be useful



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] showuon commented on a change in pull request #11864: KAFKA-13717: skip coordinator lookup in commitOffsetsAsync if offsets is empty

2022-03-09 Thread GitBox


showuon commented on a change in pull request #11864:
URL: https://github.com/apache/kafka/pull/11864#discussion_r823268006



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -1000,8 +1000,11 @@ void invokeCompletedOffsetCommitCallbacks() {
 public RequestFuture commitOffsetsAsync(final Map offsets, final OffsetCommitCallback callback) {
 invokeCompletedOffsetCommitCallbacks();
 
-RequestFuture future =  null;
-if (!coordinatorUnknown()) {
+RequestFuture future = null;
+if (offsets.isEmpty()) {
+// No need to check coordinator if offsets is empty since commit 
of empty offsets is completed locally.
+future = doCommitOffsetsAsync(offsets, callback);
+} else if (!coordinatorUnknown()) {
 future = doCommitOffsetsAsync(offsets, callback);

Review comment:
   I checked again, and I think it's OK. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11874: Fix typos in configuration docs

2022-03-09 Thread GitBox


showuon commented on a change in pull request #11874:
URL: https://github.com/apache/kafka/pull/11874#discussion_r823265635



##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##
@@ -404,8 +404,8 @@
 "org.apache.kafka.common.serialization.Serde interface.";
 
 public static final String WINDOWED_INNER_CLASS_SERDE = 
"windowed.inner.class.serde";
-private static final String WINDOWED_INNER_CLASS_SERDE_DOC = " Default 
serializer / deserializer for the inner class of a windowed record. Must 
implement the \" +\n" +
-"\"org.apache.kafka.common.serialization.Serde 
interface.. Note that setting this config in KafkaStreams application would 
result " +
+private static final String WINDOWED_INNER_CLASS_SERDE_DOC = " Default 
serializer / deserializer for the inner class of a windowed record. Must 
implement the " +
+"org.apache.kafka.common.serialization.Serde interface. 
Note that setting this config in KafkaStreams application would result " +

Review comment:
   Nice catch! Note, current doc is like this:
   
![image](https://user-images.githubusercontent.com/43372967/157572498-c1200a0f-ac13-4f8d-b991-7d2f2e2e1d11.png)
   

##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
##
@@ -216,8 +216,10 @@
 private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = 
"The maximum number of unacknowledged requests the client will send on a single 
connection before blocking."
 + 
" Note that if this config is set to be greater than 1 and 
enable.idempotence is set to false, there is a risk of"
 + 
" message re-ordering after a failed send due to retries (i.e., if retries are 
enabled)."
-+ 
" Additionally, enabling idempotence requires this config value to be less than 
or equal to " + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "."
-+ 
" If conflicting configurations are set and idempotence is not explicitly 
enabled, idempotence is disabled.";
++ 
" Additionally, enabling idempotence requires the value of this configuration 
to be less than or equal to " + 
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_FOR_IDEMPOTENCE + "."
++ 
" If conflicting configurations are set and idempotence is not explicitly 
enabled, idempotence is disabled. "
++ 
" Record ordering is preserved when enable.idempotence is set to 
true for idempotent "
++ 
" producer (or transactional producer), even when max in-flight requests are 
greater than 1 (supported up to 5).";

Review comment:
   Actually, I think the original version is better. Could you revert this 
change, and only fix the typo in stream config?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #10802: KAFKA-6718 / Update SubscriptionInfoData with clientTags

2022-03-09 Thread GitBox


showuon edited a comment on pull request #10802:
URL: https://github.com/apache/kafka/pull/10802#issuecomment-1063569496


   I'll start to take a look this week. Sorry for late response.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10802: KAFKA-6718 / Update SubscriptionInfoData with clientTags

2022-03-09 Thread GitBox


showuon commented on pull request #10802:
URL: https://github.com/apache/kafka/pull/10802#issuecomment-1063569496


   I'll start to take a look this week. Sorry for late.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13690) Flaky test EosIntegrationTest.shouldWriteLatestOffsetsToCheckpointOnShutdown[at_least_once]

2022-03-09 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503932#comment-17503932
 ] 

Guozhang Wang commented on KAFKA-13690:
---

May cover `shouldCommitCorrectOffsetIfInputTopicIsTransactional`.

> Flaky test 
> EosIntegrationTest.shouldWriteLatestOffsetsToCheckpointOnShutdown[at_least_once]
> ---
>
> Key: KAFKA-13690
> URL: https://issues.apache.org/jira/browse/KAFKA-13690
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>
> The _at_least_once_ version of the 
> "{*}EosIntegrationTest.shouldWriteLatestOffsetsToCheckpointOnShutdown"{*} 
> test is occasionally failing with
> h3. Error Message
> java.lang.AssertionError: The committed records do not match what expected 
> Expected: <[KeyValue(0, 0), KeyValue(0, 1), KeyValue(0, 3), KeyValue(0, 6), 
> KeyValue(0, 10), KeyValue(0, 15), KeyValue(0, 21), KeyValue(0, 28), 
> KeyValue(0, 36), KeyValue(0, 45)]> but: was <[KeyValue(0, 0), KeyValue(0, 1), 
> KeyValue(0, 3), KeyValue(0, 6), KeyValue(0, 10), KeyValue(0, 10), KeyValue(0, 
> 11), KeyValue(0, 13), KeyValue(0, 16), KeyValue(0, 20), KeyValue(0, 25), 
> KeyValue(0, 31), KeyValue(0, 38)]>
>  
> Seems we are receiving more than the expected records.
> ...of course, this is an ALOS flavor of the {*}EOS{*}IntegrationTest, so 
> perhaps we shouldn't be running this variant at all? Not sure if this 
> explains the exact output we receive but it certainly seems suspicious
>  
> Added at_least_once in [https://github.com/apache/kafka/pull/11283]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] ableegoldman commented on a change in pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning

2022-03-09 Thread GitBox


ableegoldman commented on a change in pull request #11868:
URL: https://github.com/apache/kafka/pull/11868#discussion_r823239035



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##
@@ -266,46 +266,57 @@ private boolean maybeCompleteFutureIfStillInCREATED(final 
KafkaFutureImpl
 
 private RemoveNamedTopologyResult resetOffsets(final KafkaFutureImpl 
removeTopologyFuture,
final Set 
partitionsToReset) {
-if (!partitionsToReset.isEmpty()) {
-removeTopologyFuture.whenComplete((v, throwable) -> {
-if (throwable != null) {
-removeTopologyFuture.completeExceptionally(throwable);
-}
-DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null;
-while (deleteOffsetsResult == null) {
-try {
-deleteOffsetsResult = 
adminClient.deleteConsumerGroupOffsets(
-
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), 
partitionsToReset);
-deleteOffsetsResult.all().get();
-} catch (final InterruptedException ex) {
-ex.printStackTrace();
+final KafkaFutureImpl resetOffsetsFuture = new 
KafkaFutureImpl<>();
+try {
+removeTopologyFuture.get();

Review comment:
   Yeah that is the main fix, however I realized that we are currently in 
this awkward state of psueod-async-ness and I think we might ultimately want to 
scratch this whole `RemoveNamedTopologyResult` and just make it fully blocking. 
Though I didn't want to go ahead and change the method signatures just yet, so 
I just have it block on the named topology future and then perform the offset 
reset.
   
   The actual advantage here is that before this, we were actually making the 
StreamThread who completed the future perform the offset reset, which of course 
means it gets stuck for a bit and can't continue processing until basically the 
whole group has dropped this named topology. Better to have the caller thread 
do the offset reset to let the StreamThreads keep processing the other 
topologies.
   
   (When we get to finally doing a KIP maybe we can discuss having a blocking 
and non-blocking option for these, but my feeling is let's not complicate 
things unnecessarily and it may be that we only really need a blocking version)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11867: KAFKA-12648: fix #getMinThreadVersion and include IOException + topologyName in StreamsException when topology dir cleanup fails

2022-03-09 Thread GitBox


ableegoldman commented on pull request #11867:
URL: https://github.com/apache/kafka/pull/11867#issuecomment-1063520672


   Merged to trunk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #11867: KAFKA-12648: fix #getMinThreadVersion and include IOException + topologyName in StreamsException when topology dir cleanup fails

2022-03-09 Thread GitBox


ableegoldman merged pull request #11867:
URL: https://github.com/apache/kafka/pull/11867


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11867: KAFKA-12648: fix #getMinThreadVersion and include IOException + topologyName in StreamsException when topology dir cleanup fails

2022-03-09 Thread GitBox


ableegoldman commented on pull request #11867:
URL: https://github.com/apache/kafka/pull/11867#issuecomment-1063520314


   Found a somewhat more involved bug that was responsible for this test 
failing, so I opened a second bugfix PR you can find here
   
   Without this followup PR, the test may continue to see failures (as we see 
in the build results here). 
   
   But otherwise test failures are unrelated, going to merge this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lihaosky commented on a change in pull request #11802: [RFC][1/N]add new RocksDBTimeOrderedWindowStore

2022-03-09 Thread GitBox


lihaosky commented on a change in pull request #11802:
URL: https://github.com/apache/kafka/pull/11802#discussion_r823220988



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java
##
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Arrays;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.kafka.streams.state.StateSerdes;
+
+import static org.apache.kafka.streams.state.StateSerdes.TIMESTAMP_SIZE;
+import static 
org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize;
+
+public class PrefixedWindowKeySchemas {
+
+private static final int PREFIX_SIZE = 1;
+private static final byte TIME_FIRST_PREFIX = 0;
+private static final byte KEY_FIRST_PREFIX = 1;
+private static final int SEQNUM_SIZE = 4;
+private static final int SUFFIX_SIZE = TIMESTAMP_SIZE + SEQNUM_SIZE;
+
+private static byte extractPrefix(final byte[] binaryBytes) {
+return binaryBytes[0];
+}
+
+public static class TimeFirstWindowKeySchema implements 
RocksDBSegmentedBytesStore.KeySchema {
+
+@Override
+public Bytes upperRange(final Bytes key, final long to) {
+if (key == null) {
+// Put next prefix instead of null so that we can start from 
right prefix
+// when scanning backwards
+final byte nextPrefix = TIME_FIRST_PREFIX + 1;
+return 
Bytes.wrap(ByteBuffer.allocate(PREFIX_SIZE).put(nextPrefix).array());
+}
+byte[] maxKey = new byte[key.get().length];
+Arrays.fill(maxKey, (byte) 0xFF);
+return Bytes.wrap(ByteBuffer.allocate(PREFIX_SIZE + TIMESTAMP_SIZE 
+ maxKey.length + SEQNUM_SIZE)
+.put(TIME_FIRST_PREFIX)
+.putLong(to)
+.put(maxKey).putInt(Integer.MAX_VALUE)
+.array());
+}
+
+@Override
+public Bytes lowerRange(final Bytes key, final long from) {
+if (key == null) {
+return Bytes.wrap(ByteBuffer.allocate(PREFIX_SIZE + 
TIMESTAMP_SIZE)
+.put(TIME_FIRST_PREFIX)
+.putLong(from)
+.array());
+}
+
+/*
+ * Larger timestamp or key's byte order can't be smaller than this 
lower range. Reason:
+ * 1. Timestamp is fixed length (with big endian byte order). 
Since we put timestamp
+ *first, larger timestamp will have larger byte order.
+ * 2. If timestamp is the same but key (k1) is larger than 
this lower range key (k2):
+ * a. If k2 is not a prefix of k1, then k1 will always 
have larger byte order no
+ *matter what seqnum k2 has
+ * b. If k2 is a prefix of k1, since k2's seqnum is 0, 
after k1 appends seqnum,
+ *it will always be larger than (k1 + seqnum).
+ */
+return Bytes.wrap(ByteBuffer.allocate(PREFIX_SIZE + TIMESTAMP_SIZE 
+ key.get().length)
+.put(TIME_FIRST_PREFIX)
+.putLong(from)
+.put(key.get())
+.array());
+}
+
+@Override
+public Bytes lowerRangeFixedSize(final Bytes key, final long from) {
+return TimeFirstWindowKeySchema.toStoreKeyBinary(key, Math.max(0, 
from),
+0);
+}
+
+@Override
+public Bytes upperRangeFixedSize(final Bytes key, final long to) {
+return TimeFirstWindowKeySchema.toStoreKeyBinary(key, to, 
Integer.MAX_VALUE);
+}
+
+@Override
+public long segmentTimestamp(final Bytes key) {
+return TimeFirstWindowKeySchema.extractStoreTimestamp(key.get());
+}
+
+@Override
+public HasNextCondition hasNextCondition(final Bytes 

[jira] [Comment Edited] (KAFKA-10690) Produce-response delay caused by lagging replica fetch which affects in-sync one

2022-03-09 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503911#comment-17503911
 ] 

Haruki Okada edited comment on KAFKA-10690 at 3/10/22, 12:14 AM:
-

Thanks for the comment. 

 

[~showuon] 

> Are you sure this issue is due to the `in-sync` replica fetch?

 

Yeah, as long as replica fetch is `out-of-sync`, it doesn't block 
produce-request so the issue happens only on `in-sync` replica when `in-sync` 
replica fetching and `out-of-sync` replica fetching are done in same replica 
fetcher thread on follower side.

 

> Could you have a PoC to add an additional thread pool for lagging replica to 
> confirm this solution?

 

Haven't tried, as we wanted to confirm if anyone encounter similar issue or not 
(and if anyone addressed it in some way) first. But let us consider!

 

[~junrao] 

> Have you tried enabling replication throttling?

 

Yeah, we use replication throttling, and we suppose disk's performance itself 
is stable even on lagging-replica fetch.

We use HDD, so reading the data takes few~tens of milliseconds per IO even it's 
stable.

So if lagging replica fetch (likely not in page cache so causes disk reads) and 
in-sync replica fetch are done in same replica fetcher thread, in-sync one 
greatly affected by due to lagging one.


was (Author: ocadaruma):
Thanks for the comment. 

 

[~showuon] 

 

> Are you sure this issue is due to the `in-sync` replica fetch?

 

Yeah, as long as replica fetch is `out-of-sync`, it doesn't block 
produce-request so the issue happens only on `in-sync` replica when `in-sync` 
replica fetching and `out-of-sync` replica fetching are done in same replica 
fetcher thread on follower side.

 

> Could you have a PoC to add an additional thread pool for lagging replica to 
> confirm this solution?

 

Haven't tried, as we wanted to confirm if anyone encounter similar issue or not 
(and if anyone addressed it in some way) first. But let us consider!

 

[~junrao] 

 

> Have you tried enabling replication throttling?

 

Yeah, we use replication throttling, and we suppose disk's performance itself 
is stable even on lagging-replica fetch.

We use HDD, so reading the data takes few~tens of milliseconds per IO even it's 
stable.

So if lagging replica fetch (likely not in page cache so causes disk reads) and 
in-sync replica fetch are done in same replica fetcher thread (i.e. in same 
Fetch request), in-sync one greatly affected by due to lagging one.

> Produce-response delay caused by lagging replica fetch which affects in-sync 
> one
> 
>
> Key: KAFKA-10690
> URL: https://issues.apache.org/jira/browse/KAFKA-10690
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.4.1
>Reporter: Haruki Okada
>Priority: Major
> Attachments: image-2020-11-06-11-15-21-781.png, 
> image-2020-11-06-11-15-38-390.png, image-2020-11-06-11-17-09-910.png
>
>
> h2. Our environment
>  * Kafka version: 2.4.1
> h2. Phenomenon
>  * Produce response time 99th (remote scope) degrades to 500ms, which is 20 
> times worse than usual
>  ** Meanwhile, the cluster was running replica reassignment to service-in new 
> machine to recover replicas which held by failed (Hardware issue) broker 
> machine
> !image-2020-11-06-11-15-21-781.png|width=292,height=166!
> h2. Analysis
> Let's say
>  * broker-X: The broker we observed produce latency degradation
>  * broker-Y: The broker under servicing-in
> broker-Y was catching up replicas of partitions:
>  * partition-A: has relatively small log size
>  * partition-B: has large log size
> (actually, broker-Y was catching-up many other partitions. I noted only two 
> partitions here to make explanation simple)
> broker-X was the leader for both partition-A and partition-B.
> We found that both partition-A and partition-B are assigned to same 
> ReplicaFetcherThread of broker-Y, and produce latency started to degrade 
> right after broker-Y finished catching up partition-A.
> !image-2020-11-06-11-17-09-910.png|width=476,height=174!
> Besides, we observed disk reads on broker-X during service-in. (This is 
> natural since old segments are likely not in page cache)
> !image-2020-11-06-11-15-38-390.png|width=292,height=193!
> So we suspected that:
>  * In-sync replica fetch (partition-A) was involved by lagging replica fetch 
> (partition-B), which should be slow because it causes actual disk reads
>  ** Since ReplicaFetcherThread sends fetch requests in blocking manner, next 
> fetch request can't be sent until one fetch request completes
>  ** => Causes in-sync replica fetch for partitions assigned to same replica 
> fetcher thread to delay
>  ** => Causes remote scope produce latency degradation
> h2. Possible fix
> We 

[GitHub] [kafka] guozhangwang commented on a change in pull request #11802: [RFC][1/N]add new RocksDBTimeOrderedWindowStore

2022-03-09 Thread GitBox


guozhangwang commented on a change in pull request #11802:
URL: https://github.com/apache/kafka/pull/11802#discussion_r823215870



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java
##
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Arrays;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.kafka.streams.state.StateSerdes;
+
+import static org.apache.kafka.streams.state.StateSerdes.TIMESTAMP_SIZE;
+import static 
org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize;
+
+public class PrefixedWindowKeySchemas {
+
+private static final int PREFIX_SIZE = 1;
+private static final byte TIME_FIRST_PREFIX = 0;
+private static final byte KEY_FIRST_PREFIX = 1;
+private static final int SEQNUM_SIZE = 4;
+private static final int SUFFIX_SIZE = TIMESTAMP_SIZE + SEQNUM_SIZE;
+
+private static byte extractPrefix(final byte[] binaryBytes) {
+return binaryBytes[0];
+}
+
+public static class TimeFirstWindowKeySchema implements 
RocksDBSegmentedBytesStore.KeySchema {
+
+@Override
+public Bytes upperRange(final Bytes key, final long to) {
+if (key == null) {
+// Put next prefix instead of null so that we can start from 
right prefix
+// when scanning backwards
+final byte nextPrefix = TIME_FIRST_PREFIX + 1;
+return 
Bytes.wrap(ByteBuffer.allocate(PREFIX_SIZE).put(nextPrefix).array());
+}
+byte[] maxKey = new byte[key.get().length];
+Arrays.fill(maxKey, (byte) 0xFF);
+return Bytes.wrap(ByteBuffer.allocate(PREFIX_SIZE + TIMESTAMP_SIZE 
+ maxKey.length + SEQNUM_SIZE)
+.put(TIME_FIRST_PREFIX)
+.putLong(to)
+.put(maxKey).putInt(Integer.MAX_VALUE)
+.array());
+}
+
+@Override
+public Bytes lowerRange(final Bytes key, final long from) {
+if (key == null) {
+return Bytes.wrap(ByteBuffer.allocate(PREFIX_SIZE + 
TIMESTAMP_SIZE)
+.put(TIME_FIRST_PREFIX)
+.putLong(from)
+.array());
+}
+
+/*
+ * Larger timestamp or key's byte order can't be smaller than this 
lower range. Reason:
+ * 1. Timestamp is fixed length (with big endian byte order). 
Since we put timestamp
+ *first, larger timestamp will have larger byte order.
+ * 2. If timestamp is the same but key (k1) is larger than 
this lower range key (k2):
+ * a. If k2 is not a prefix of k1, then k1 will always 
have larger byte order no
+ *matter what seqnum k2 has
+ * b. If k2 is a prefix of k1, since k2's seqnum is 0, 
after k1 appends seqnum,
+ *it will always be larger than (k1 + seqnum).
+ */
+return Bytes.wrap(ByteBuffer.allocate(PREFIX_SIZE + TIMESTAMP_SIZE 
+ key.get().length)
+.put(TIME_FIRST_PREFIX)
+.putLong(from)
+.put(key.get())
+.array());
+}
+
+@Override
+public Bytes lowerRangeFixedSize(final Bytes key, final long from) {
+return TimeFirstWindowKeySchema.toStoreKeyBinary(key, Math.max(0, 
from),
+0);
+}
+
+@Override
+public Bytes upperRangeFixedSize(final Bytes key, final long to) {
+return TimeFirstWindowKeySchema.toStoreKeyBinary(key, to, 
Integer.MAX_VALUE);
+}
+
+@Override
+public long segmentTimestamp(final Bytes key) {
+return TimeFirstWindowKeySchema.extractStoreTimestamp(key.get());
+}
+
+@Override
+public HasNextCondition hasNextCondition(final Bytes 

[GitHub] [kafka] vincent81jiang commented on a change in pull request #11864: KAFKA-13717: skip coordinator lookup in commitOffsetsAsync if offsets is empty

2022-03-09 Thread GitBox


vincent81jiang commented on a change in pull request #11864:
URL: https://github.com/apache/kafka/pull/11864#discussion_r823215546



##
File path: 
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
##
@@ -2483,6 +2483,20 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 }
   }
 
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateAndCloseConsumerWithNoAccess(quorum: String): Unit = {
+val consumer = createConsumer()
+try {
+  // Close consumer without consuming anything. close() call should pass 
successfully and throw no exception.
+  consumer.close()
+} catch {
+  case e: Throwable =>
+fail(s"Exception not expected on closing consumer: $e")
+}

Review comment:
   @showuon, sounds good. Updated to use assertDoesNotThrow. 
   
   Note: I cannot pass "() => consumer.close()" directly to assertDoesNotThrow 
because it causes compile error:
   ```
   ambiguous reference to overloaded definition,
   both method assertDoesNotThrow in class Assertions of type [T](x$1: 
org.junit.jupiter.api.function.ThrowingSupplier[T], x$2: String): T
   and  method assertDoesNotThrow in class Assertions of type (x$1: 
org.junit.jupiter.api.function.Executable, x$2: String): Unit
   match argument types (() => Unit,String)
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10690) Produce-response delay caused by lagging replica fetch which affects in-sync one

2022-03-09 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503911#comment-17503911
 ] 

Haruki Okada commented on KAFKA-10690:
--

Thanks for the comment. 

 

[~showuon] 

 

> Are you sure this issue is due to the `in-sync` replica fetch?

 

Yeah, as long as replica fetch is `out-of-sync`, it doesn't block 
produce-request so the issue happens only on `in-sync` replica when `in-sync` 
replica fetching and `out-of-sync` replica fetching are done in same replica 
fetcher thread on follower side.

 

> Could you have a PoC to add an additional thread pool for lagging replica to 
> confirm this solution?

 

Haven't tried, as we wanted to confirm if anyone encounter similar issue or not 
(and if anyone addressed it in some way) first. But let us consider!

 

[~junrao] 

 

> Have you tried enabling replication throttling?

 

Yeah, we use replication throttling, and we suppose disk's performance itself 
is stable even on lagging-replica fetch.

We use HDD, so reading the data takes few~tens of milliseconds per IO even it's 
stable.

So if lagging replica fetch (likely not in page cache so causes disk reads) and 
in-sync replica fetch are done in same replica fetcher thread (i.e. in same 
Fetch request), in-sync one greatly affected by due to lagging one.

> Produce-response delay caused by lagging replica fetch which affects in-sync 
> one
> 
>
> Key: KAFKA-10690
> URL: https://issues.apache.org/jira/browse/KAFKA-10690
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.4.1
>Reporter: Haruki Okada
>Priority: Major
> Attachments: image-2020-11-06-11-15-21-781.png, 
> image-2020-11-06-11-15-38-390.png, image-2020-11-06-11-17-09-910.png
>
>
> h2. Our environment
>  * Kafka version: 2.4.1
> h2. Phenomenon
>  * Produce response time 99th (remote scope) degrades to 500ms, which is 20 
> times worse than usual
>  ** Meanwhile, the cluster was running replica reassignment to service-in new 
> machine to recover replicas which held by failed (Hardware issue) broker 
> machine
> !image-2020-11-06-11-15-21-781.png|width=292,height=166!
> h2. Analysis
> Let's say
>  * broker-X: The broker we observed produce latency degradation
>  * broker-Y: The broker under servicing-in
> broker-Y was catching up replicas of partitions:
>  * partition-A: has relatively small log size
>  * partition-B: has large log size
> (actually, broker-Y was catching-up many other partitions. I noted only two 
> partitions here to make explanation simple)
> broker-X was the leader for both partition-A and partition-B.
> We found that both partition-A and partition-B are assigned to same 
> ReplicaFetcherThread of broker-Y, and produce latency started to degrade 
> right after broker-Y finished catching up partition-A.
> !image-2020-11-06-11-17-09-910.png|width=476,height=174!
> Besides, we observed disk reads on broker-X during service-in. (This is 
> natural since old segments are likely not in page cache)
> !image-2020-11-06-11-15-38-390.png|width=292,height=193!
> So we suspected that:
>  * In-sync replica fetch (partition-A) was involved by lagging replica fetch 
> (partition-B), which should be slow because it causes actual disk reads
>  ** Since ReplicaFetcherThread sends fetch requests in blocking manner, next 
> fetch request can't be sent until one fetch request completes
>  ** => Causes in-sync replica fetch for partitions assigned to same replica 
> fetcher thread to delay
>  ** => Causes remote scope produce latency degradation
> h2. Possible fix
> We think this issue can be addressed by designating part of 
> ReplicaFetcherThread (or creating another thread pool) for lagging replica 
> catching-up, but not so sure this is the appropriate way.
> Please give your opinions about this issue.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] guozhangwang commented on a change in pull request #11802: [RFC][1/N]add new RocksDBTimeOrderedWindowStore

2022-03-09 Thread GitBox


guozhangwang commented on a change in pull request #11802:
URL: https://github.com/apache/kafka/pull/11802#discussion_r823200925



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
##
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
+import 
org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import 
org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
+
+public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore implements SegmentedBytesStore {
+private static final Logger LOG = 
LoggerFactory.getLogger(AbstractDualSchemaRocksDBSegmentedBytesStore.class);
+
+private final String name;
+protected final AbstractSegments segments;
+private final String metricScope;
+protected final KeySchema baseKeySchema;
+protected final Optional indexKeySchema;
+
+
+private ProcessorContext context;
+private StateStoreContext stateStoreContext;

Review comment:
   Filed https://issues.apache.org/jira/browse/KAFKA-13722




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #11796: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-03-09 Thread GitBox


guozhangwang commented on pull request #11796:
URL: https://github.com/apache/kafka/pull/11796#issuecomment-1063494691


   cc @ableegoldman 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #11796: KAFKA-13152: Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-03-09 Thread GitBox


guozhangwang commented on pull request #11796:
URL: https://github.com/apache/kafka/pull/11796#issuecomment-1063494547


   Re-triggered jenkins.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 edited a comment on pull request #11873: MINOR: Revert "KAFKA-13542: add rebalance reason in Kafka Streams (#11804)"

2022-03-09 Thread GitBox


wcarlson5 edited a comment on pull request #11873:
URL: https://github.com/apache/kafka/pull/11873#issuecomment-1063493748


   @vvcephei @cadonna 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on pull request #11873: MINOR: Revert "KAFKA-13542: add rebalance reason in Kafka Streams (#11804)"

2022-03-09 Thread GitBox


wcarlson5 commented on pull request #11873:
URL: https://github.com/apache/kafka/pull/11873#issuecomment-1063493748


   @vvcephei


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] norwood closed pull request #6508: MINOR: replication factor is short

2022-03-09 Thread GitBox


norwood closed pull request #6508:
URL: https://github.com/apache/kafka/pull/6508


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #11858: [Emit final][3/N] add emit final param to relevant windows

2022-03-09 Thread GitBox


guozhangwang commented on pull request #11858:
URL: https://github.com/apache/kafka/pull/11858#issuecomment-1063480596


   BTW since this is an API change, I think the above should happen on the 
email thread than on just this PR as well. So the sooner we put up a KIP for 
public discussion the better.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] joel-hamill opened a new pull request #11874: Fix typos in configuration docs

2022-03-09 Thread GitBox


joel-hamill opened a new pull request #11874:
URL: https://github.com/apache/kafka/pull/11874


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12879) Compatibility break in Admin.listOffsets()

2022-03-09 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503891#comment-17503891
 ] 

Randall Hauch commented on KAFKA-12879:
---

The approach we decided to take was to revert the previous admin client changes 
from KAFKA-12339 to bring the admin client behavior back to previous 
expectations, and to implement retries within the KafkaBasedLog to handle cases 
like those identified in that issue.

For example, a likely root cause of KAFKA-12339 was a Connect worker 
instantiates its KafkaConfigBackingStore (and other internal topic stores), 
which creates a KafkaBasedLog that as part of start() creates the topic if it 
doesn't exist and then immediately tries to read the offsets. That reading of 
offsets can fail if the metadata for the newly created topic hasn't been 
propagated to all of the brokers. We can solve this particular root cause 
easily by retrying the reading of offsets within the KafkaBasedLog's start() 
method, and since topic metadata should be propagated relatively quickly, we 
don't need to retry for that long – and most of the time we'd probably 
successfully retry within a few retries.

I've just merged to trunk a PR that does this. When trying to backport this, 
some of the newer tests were flaky, so [~pnee] created another PR (plus 
another) to hopefully eliminate that flakiness, and it seemed to work. 

I'm in the process of backporting this all the way back to 2.5 branch, since 
that's how far back the regression from KAFKA-12339 was backported.

> Compatibility break in Admin.listOffsets()
> --
>
> Key: KAFKA-12879
> URL: https://issues.apache.org/jira/browse/KAFKA-12879
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.8.0, 2.7.1, 2.6.2
>Reporter: Tom Bentley
>Assignee: Philip Nee
>Priority: Major
>
> KAFKA-12339 incompatibly changed the semantics of Admin.listOffsets(). 
> Previously it would fail with {{UnknownTopicOrPartitionException}} when a 
> topic didn't exist. Now it will (eventually) fail with {{TimeoutException}}. 
> It seems this was more or less intentional, even though it would break code 
> which was expecting and handling the {{UnknownTopicOrPartitionException}}. A 
> workaround is to use {{retries=1}} and inspect the cause of the 
> {{TimeoutException}}, but this isn't really suitable for cases where the same 
> Admin client instance is being used for other calls where retries is 
> desirable.
> Furthermore as well as the intended effect on {{listOffsets()}} it seems that 
> the change could actually affect other methods of Admin.
> More generally, the Admin client API is vague about which exceptions can 
> propagate from which methods. This means that it's not possible to say, in 
> cases like this, whether the calling code _should_ have been relying on the 
> {{UnknownTopicOrPartitionException}} or not.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13725) KIP-768 OAuth code mixes public and internal classes in same package

2022-03-09 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13725?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-13725:
--
Description: 
The {{org.apache.kafka.common.security.oauthbearer.secured}} package from 
KIP-768 incorrectly mixed all of the classes (public and internal) in the 
package together.

This bug is to remove all but the public classes from that package and move the 
rest to a new {{org.apache.kafka.common.security.oauthbearer.secured.internal}} 
package. This should be back-ported to all versions in which the KIP-768 OAuth 
work occurs.

  was:
The {{org.apache.kafka.common.security.oauthbearer.secured}} package from 
KIP-768 incorrectly mixed all of the classes (public and internal) in the 
package together.

This bug is to remove all but the public classes from that package and move the 
rest to a new {{org.apache.kafka.common.security.oauthbearer.secured.internal}} 
package. This should be packported to all versions in which the KIP-768 OAuth 
work occurs.


> KIP-768 OAuth code mixes public and internal classes in same package
> 
>
> Key: KAFKA-13725
> URL: https://issues.apache.org/jira/browse/KAFKA-13725
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.1.0, 3.2.0, 3.1.1
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>
> The {{org.apache.kafka.common.security.oauthbearer.secured}} package from 
> KIP-768 incorrectly mixed all of the classes (public and internal) in the 
> package together.
> This bug is to remove all but the public classes from that package and move 
> the rest to a new 
> {{org.apache.kafka.common.security.oauthbearer.secured.internal}} package. 
> This should be back-ported to all versions in which the KIP-768 OAuth work 
> occurs.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] kirktrue commented on pull request #11811: (docs) Add JavaDocs for org.apache.kafka.common.security.oauthbearer.secured

2022-03-09 Thread GitBox


kirktrue commented on pull request #11811:
URL: https://github.com/apache/kafka/pull/11811#issuecomment-1063442392


   I have filed KAFKA-13725 to address the package layout bug.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13725) KIP-768 OAuth code mixes public and internal classes in same package

2022-03-09 Thread Kirk True (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13725?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-13725:
--
Description: 
The {{org.apache.kafka.common.security.oauthbearer.secured}} package from 
KIP-768 incorrectly mixed all of the classes (public and internal) in the 
package together.

This bug is to remove all but the public classes from that package and move the 
rest to a new {{org.apache.kafka.common.security.oauthbearer.secured.internal}} 
package. This should be packported to all versions in which the KIP-768 OAuth 
work occurs.

> KIP-768 OAuth code mixes public and internal classes in same package
> 
>
> Key: KAFKA-13725
> URL: https://issues.apache.org/jira/browse/KAFKA-13725
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.1.0, 3.2.0, 3.1.1
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Major
>
> The {{org.apache.kafka.common.security.oauthbearer.secured}} package from 
> KIP-768 incorrectly mixed all of the classes (public and internal) in the 
> package together.
> This bug is to remove all but the public classes from that package and move 
> the rest to a new 
> {{org.apache.kafka.common.security.oauthbearer.secured.internal}} package. 
> This should be packported to all versions in which the KIP-768 OAuth work 
> occurs.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] wcarlson5 opened a new pull request #11873: MINOR: Revert "KAFKA-13542: add rebalance reason in Kafka Streams (#11804)"

2022-03-09 Thread GitBox


wcarlson5 opened a new pull request #11873:
URL: https://github.com/apache/kafka/pull/11873


   This reverts commit 2ccc834faa3fffcd5d15d2463aeef3ee6f5cea13. We were seeing 
serious regressions in our state heavy benchmarks.  Not yet sure how this was 
causing it but reverting it fixed the issues that started with this commit.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13726) Fix Vulnerability CVE-2022-23181 -Upgrade org.apache.tomcat.embed_tomcat-embed-core

2022-03-09 Thread Chris Sabelstrom (Jira)
Chris Sabelstrom created KAFKA-13726:


 Summary: Fix Vulnerability CVE-2022-23181 -Upgrade 
org.apache.tomcat.embed_tomcat-embed-core
 Key: KAFKA-13726
 URL: https://issues.apache.org/jira/browse/KAFKA-13726
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.8.1
Reporter: Chris Sabelstrom


Our security scanner detected the following vulnerablity. Please upgrade to 
version noted in Fix Status column.
|CVE ID|Severity|Packages|Package Version|CVSS|Fix Status|
|CVE-2022-23181|high|org.apache.tomcat.embed_tomcat-embed-core|9.0.54|7|fixed 
in 10.0.0, 9.0.1|



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13725) KIP-768 OAuth code mixes public and internal classes in same package

2022-03-09 Thread Kirk True (Jira)
Kirk True created KAFKA-13725:
-

 Summary: KIP-768 OAuth code mixes public and internal classes in 
same package
 Key: KAFKA-13725
 URL: https://issues.apache.org/jira/browse/KAFKA-13725
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.1.0, 3.2.0, 3.1.1
Reporter: Kirk True
Assignee: Kirk True






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13724) Fix Vulnerability CVE-2021-43859 - Upgrade com.thoughtworks.xstream_xstream

2022-03-09 Thread Chris Sabelstrom (Jira)
Chris Sabelstrom created KAFKA-13724:


 Summary: Fix Vulnerability CVE-2021-43859 - Upgrade 
com.thoughtworks.xstream_xstream
 Key: KAFKA-13724
 URL: https://issues.apache.org/jira/browse/KAFKA-13724
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.8.1
Reporter: Chris Sabelstrom


Our security scanner detected the following vulnerablity. Please upgrade to 
version noted in Fix Status column.
|CVE ID|Severity|Packages|Package Version|CVSS|Fix Status|
|CVE-2021-43859|high|com.thoughtworks.xstream_xstream|1.4.18|7.5|fixed in 
1.4.19|



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] guozhangwang commented on a change in pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning

2022-03-09 Thread GitBox


guozhangwang commented on a change in pull request #11868:
URL: https://github.com/apache/kafka/pull/11868#discussion_r823130503



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##
@@ -266,46 +266,57 @@ private boolean maybeCompleteFutureIfStillInCREATED(final 
KafkaFutureImpl
 
 private RemoveNamedTopologyResult resetOffsets(final KafkaFutureImpl 
removeTopologyFuture,
final Set 
partitionsToReset) {
-if (!partitionsToReset.isEmpty()) {
-removeTopologyFuture.whenComplete((v, throwable) -> {
-if (throwable != null) {
-removeTopologyFuture.completeExceptionally(throwable);
-}
-DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null;
-while (deleteOffsetsResult == null) {
-try {
-deleteOffsetsResult = 
adminClient.deleteConsumerGroupOffsets(
-
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), 
partitionsToReset);
-deleteOffsetsResult.all().get();
-} catch (final InterruptedException ex) {
-ex.printStackTrace();
+final KafkaFutureImpl resetOffsetsFuture = new 
KafkaFutureImpl<>();
+try {
+removeTopologyFuture.get();

Review comment:
   Why we have to wait on the first future before moving forward to 
construct the second future now? I thought the main fix is only in 
https://github.com/apache/kafka/pull/11868/files#diff-8baa5d7209fc00074bf3fe24d709c2dcf2a44c1623d7ced8c0e29c1d832a3bcbR1132
 above, and with that we do not need to change behavior to wait for the removal 
of topology completes still?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-12879) Compatibility break in Admin.listOffsets()

2022-03-09 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503865#comment-17503865
 ] 

Chris Egerton edited comment on KAFKA-12879 at 3/9/22, 9:51 PM:


Are there any practical use cases for listing the offsets of a just-created 
topic? Are any of these use cases more likely than ones that would involve 
describing a just-created topic?

It seems a little heavy-handed to suggest to users that they invoke 
{{Admin::describeTopics}} before {{Admin::listOffsets}} in order to handle 
non-existing topics, at least if this pattern hasn't already been documented as 
a best practice for people using the Java admin client.

Preserving existing behavior (which IMO is valid for the reasons Colin has laid 
out) seems like the correct move here.


was (Author: chrisegerton):
Are there any practical use cases for listing the offsets of a just-created 
topic? Are any of these use cases more likely than ones that would involve 
describing a just-created topic?

It seems a little heavy-handed to suggest to users that they invoke 
{{Admin::describeTopics}} before {{Admin::listOffsets}} in order to handle 
non-existing topics, at least if this pattern hasn't already been documented as 
a best practice for people using the Java admin client.

> Compatibility break in Admin.listOffsets()
> --
>
> Key: KAFKA-12879
> URL: https://issues.apache.org/jira/browse/KAFKA-12879
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.8.0, 2.7.1, 2.6.2
>Reporter: Tom Bentley
>Assignee: Philip Nee
>Priority: Major
>
> KAFKA-12339 incompatibly changed the semantics of Admin.listOffsets(). 
> Previously it would fail with {{UnknownTopicOrPartitionException}} when a 
> topic didn't exist. Now it will (eventually) fail with {{TimeoutException}}. 
> It seems this was more or less intentional, even though it would break code 
> which was expecting and handling the {{UnknownTopicOrPartitionException}}. A 
> workaround is to use {{retries=1}} and inspect the cause of the 
> {{TimeoutException}}, but this isn't really suitable for cases where the same 
> Admin client instance is being used for other calls where retries is 
> desirable.
> Furthermore as well as the intended effect on {{listOffsets()}} it seems that 
> the change could actually affect other methods of Admin.
> More generally, the Admin client API is vague about which exceptions can 
> propagate from which methods. This means that it's not possible to say, in 
> cases like this, whether the calling code _should_ have been relying on the 
> {{UnknownTopicOrPartitionException}} or not.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-12879) Compatibility break in Admin.listOffsets()

2022-03-09 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503865#comment-17503865
 ] 

Chris Egerton edited comment on KAFKA-12879 at 3/9/22, 9:50 PM:


Are there any practical use cases for listing the offsets of a just-created 
topic? Are any of these use cases more likely than ones that would involve 
describing a just-created topic?

It seems a little heavy-handed to suggest to users that they invoke 
{{Admin::describeTopics}} before {{Admin::listOffsets}} in order to handle 
non-existing topics, at least if this pattern hasn't already been documented as 
a best practice for people using the Java admin client.


was (Author: chrisegerton):
Are there any practical use cases for listing the offsets of a just-created 
topic? Are any of these use cases more likely than ones that would involve 
describing a just-created topic?

> Compatibility break in Admin.listOffsets()
> --
>
> Key: KAFKA-12879
> URL: https://issues.apache.org/jira/browse/KAFKA-12879
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.8.0, 2.7.1, 2.6.2
>Reporter: Tom Bentley
>Assignee: Philip Nee
>Priority: Major
>
> KAFKA-12339 incompatibly changed the semantics of Admin.listOffsets(). 
> Previously it would fail with {{UnknownTopicOrPartitionException}} when a 
> topic didn't exist. Now it will (eventually) fail with {{TimeoutException}}. 
> It seems this was more or less intentional, even though it would break code 
> which was expecting and handling the {{UnknownTopicOrPartitionException}}. A 
> workaround is to use {{retries=1}} and inspect the cause of the 
> {{TimeoutException}}, but this isn't really suitable for cases where the same 
> Admin client instance is being used for other calls where retries is 
> desirable.
> Furthermore as well as the intended effect on {{listOffsets()}} it seems that 
> the change could actually affect other methods of Admin.
> More generally, the Admin client API is vague about which exceptions can 
> propagate from which methods. This means that it's not possible to say, in 
> cases like this, whether the calling code _should_ have been relying on the 
> {{UnknownTopicOrPartitionException}} or not.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-12879) Compatibility break in Admin.listOffsets()

2022-03-09 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503865#comment-17503865
 ] 

Chris Egerton commented on KAFKA-12879:
---

Are there any practical use cases for listing the offsets of a just-created 
topic? Are any of these use cases more likely than ones that would involve 
describing a just-created topic?

> Compatibility break in Admin.listOffsets()
> --
>
> Key: KAFKA-12879
> URL: https://issues.apache.org/jira/browse/KAFKA-12879
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.8.0, 2.7.1, 2.6.2
>Reporter: Tom Bentley
>Assignee: Philip Nee
>Priority: Major
>
> KAFKA-12339 incompatibly changed the semantics of Admin.listOffsets(). 
> Previously it would fail with {{UnknownTopicOrPartitionException}} when a 
> topic didn't exist. Now it will (eventually) fail with {{TimeoutException}}. 
> It seems this was more or less intentional, even though it would break code 
> which was expecting and handling the {{UnknownTopicOrPartitionException}}. A 
> workaround is to use {{retries=1}} and inspect the cause of the 
> {{TimeoutException}}, but this isn't really suitable for cases where the same 
> Admin client instance is being used for other calls where retries is 
> desirable.
> Furthermore as well as the intended effect on {{listOffsets()}} it seems that 
> the change could actually affect other methods of Admin.
> More generally, the Admin client API is vague about which exceptions can 
> propagate from which methods. This means that it's not possible to say, in 
> cases like this, whether the calling code _should_ have been relying on the 
> {{UnknownTopicOrPartitionException}} or not.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13723) max.compaction.lag.ms implemented incorrectly

2022-03-09 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503854#comment-17503854
 ] 

Jun Rao commented on KAFKA-13723:
-

[~xiongqiwu] : Thanks for the explanation. Make sense. So, this is not an issue.

> max.compaction.lag.ms implemented incorrectly
> -
>
> Key: KAFKA-13723
> URL: https://issues.apache.org/jira/browse/KAFKA-13723
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.3.0
>Reporter: Jun Rao
>Priority: Major
>
> In https://issues.apache.org/jira/browse/KAFKA-7321, we introduced 
> max.compaction.lag.ms to guarantee that a record be cleaned before a certain 
> time. 
>  
> The implementation in LogCleanerManager has the following code. The path for 
> earliestDirtySegmentTimestamp < cleanUntilTime seems incorrect. In that case, 
> it seems that we should set the delay to 0 so that we could trigger cleaning 
> immediately since the segment has been dirty for longer than 
> max.compaction.lag.ms. 
>  
>  
> {code:java}
> def maxCompactionDelay(log: UnifiedLog, firstDirtyOffset: Long, now: Long) : 
> Long = {
> ...
> val maxCompactionLagMs = math.max(log.config.maxCompactionLagMs, 0L)
> val cleanUntilTime = now - maxCompactionLagMs
> if (earliestDirtySegmentTimestamp < cleanUntilTime)
> cleanUntilTime - earliestDirtySegmentTimestamp
> else
> 0L
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13723) max.compaction.lag.ms implemented incorrectly

2022-03-09 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-13723.
-
Resolution: Not A Problem

> max.compaction.lag.ms implemented incorrectly
> -
>
> Key: KAFKA-13723
> URL: https://issues.apache.org/jira/browse/KAFKA-13723
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.3.0
>Reporter: Jun Rao
>Priority: Major
>
> In https://issues.apache.org/jira/browse/KAFKA-7321, we introduced 
> max.compaction.lag.ms to guarantee that a record be cleaned before a certain 
> time. 
>  
> The implementation in LogCleanerManager has the following code. The path for 
> earliestDirtySegmentTimestamp < cleanUntilTime seems incorrect. In that case, 
> it seems that we should set the delay to 0 so that we could trigger cleaning 
> immediately since the segment has been dirty for longer than 
> max.compaction.lag.ms. 
>  
>  
> {code:java}
> def maxCompactionDelay(log: UnifiedLog, firstDirtyOffset: Long, now: Long) : 
> Long = {
> ...
> val maxCompactionLagMs = math.max(log.config.maxCompactionLagMs, 0L)
> val cleanUntilTime = now - maxCompactionLagMs
> if (earliestDirtySegmentTimestamp < cleanUntilTime)
> cleanUntilTime - earliestDirtySegmentTimestamp
> else
> 0L
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] rhauch merged pull request #11872: KAFKA-12879: Remove extra sleep

2022-03-09 Thread GitBox


rhauch merged pull request #11872:
URL: https://github.com/apache/kafka/pull/11872


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch opened a new pull request #11872: KAFKA-12879: Remove extra sleep

2022-03-09 Thread GitBox


rhauch opened a new pull request #11872:
URL: https://github.com/apache/kafka/pull/11872


   Tests didn't catch the extra sleep kept in #11871 after optimization
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13723) max.compaction.lag.ms implemented incorrectly

2022-03-09 Thread xiongqi wu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503847#comment-17503847
 ] 

xiongqi wu commented on KAFKA-13723:


[~junrao]  Hi Jun, this function is supposed to capture violation that pass-by 
the max compaction delay. 

e.g, 

if maxCompactionDelay > 0, which mean it has violated the policy (e.g, the log 
is not compacted within the maxCompaction config time), and the log should be 
compact immediately. 

if maxCompactionDelay = 0, not violation found, and the log doesn't need to be 
compacted immediately. 

 

The name is a little bit misleading.  maxCompactionDelay doesn't mean log 
cleaner  should delay util compaction. Instead, it means the delay already 
happened, and it should be cleaned immediately. 

 

 

> max.compaction.lag.ms implemented incorrectly
> -
>
> Key: KAFKA-13723
> URL: https://issues.apache.org/jira/browse/KAFKA-13723
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.3.0
>Reporter: Jun Rao
>Priority: Major
>
> In https://issues.apache.org/jira/browse/KAFKA-7321, we introduced 
> max.compaction.lag.ms to guarantee that a record be cleaned before a certain 
> time. 
>  
> The implementation in LogCleanerManager has the following code. The path for 
> earliestDirtySegmentTimestamp < cleanUntilTime seems incorrect. In that case, 
> it seems that we should set the delay to 0 so that we could trigger cleaning 
> immediately since the segment has been dirty for longer than 
> max.compaction.lag.ms. 
>  
>  
> {code:java}
> def maxCompactionDelay(log: UnifiedLog, firstDirtyOffset: Long, now: Long) : 
> Long = {
> ...
> val maxCompactionLagMs = math.max(log.config.maxCompactionLagMs, 0L)
> val cleanUntilTime = now - maxCompactionLagMs
> if (earliestDirtySegmentTimestamp < cleanUntilTime)
> cleanUntilTime - earliestDirtySegmentTimestamp
> else
> 0L
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] rhauch merged pull request #11871: KAFKA-12879: Addendum to reduce flakiness of tests

2022-03-09 Thread GitBox


rhauch merged pull request #11871:
URL: https://github.com/apache/kafka/pull/11871


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lihaosky commented on a change in pull request #11802: [RFC][1/N]add new RocksDBTimeOrderedWindowStore

2022-03-09 Thread GitBox


lihaosky commented on a change in pull request #11802:
URL: https://github.com/apache/kafka/pull/11802#discussion_r823057006



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java
##
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Arrays;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.kafka.streams.state.StateSerdes;
+
+import static org.apache.kafka.streams.state.StateSerdes.TIMESTAMP_SIZE;
+import static 
org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize;
+
+public class PrefixedWindowKeySchemas {
+
+private static final int PREFIX_SIZE = 1;
+private static final byte TIME_FIRST_PREFIX = 0;
+private static final byte KEY_FIRST_PREFIX = 1;
+private static final int SEQNUM_SIZE = 4;
+private static final int SUFFIX_SIZE = TIMESTAMP_SIZE + SEQNUM_SIZE;
+
+private static byte extractPrefix(final byte[] binaryBytes) {
+return binaryBytes[0];
+}
+
+public static class TimeFirstWindowKeySchema implements 
RocksDBSegmentedBytesStore.KeySchema {
+
+@Override
+public Bytes upperRange(final Bytes key, final long to) {
+if (key == null) {
+// Put next prefix instead of null so that we can start from 
right prefix
+// when scanning backwards
+final byte nextPrefix = TIME_FIRST_PREFIX + 1;
+return 
Bytes.wrap(ByteBuffer.allocate(PREFIX_SIZE).put(nextPrefix).array());

Review comment:
   Because we have 
   ```
   if (prefix != TIME_FIRST_PREFIX) {
   return false;
   }
   ```
   in hasNextCondition. We need to start from correct prefix when scanning 
backward. If we put `to`, starting prefix when we scan backward could be 
`TIME_FIRST_PREFIX + 1` which would be wrong?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/PrefixedWindowKeySchemas.java
##
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Arrays;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import org.apache.kafka.streams.state.StateSerdes;
+
+import static org.apache.kafka.streams.state.StateSerdes.TIMESTAMP_SIZE;
+import static 
org.apache.kafka.streams.state.internals.WindowKeySchema.timeWindowForSize;
+
+public class PrefixedWindowKeySchemas {
+
+private static final int PREFIX_SIZE = 1;
+private static final byte TIME_FIRST_PREFIX = 0;
+private static final byte KEY_FIRST_PREFIX = 1;
+private static final int SEQNUM_SIZE = 4;
+private static final int SUFFIX_SIZE = TIMESTAMP_SIZE + SEQNUM_SIZE;
+
+private static byte extractPrefix(final byte[] binaryBytes) {
+return binaryBytes[0];
+}
+
+public static class TimeFirstWindowKeySchema implements 

[GitHub] [kafka] philipnee commented on pull request #11871: Addendum to KAFKA 12879 to fix flaky tests

2022-03-09 Thread GitBox


philipnee commented on pull request #11871:
URL: https://github.com/apache/kafka/pull/11871#issuecomment-1063324518


   Note: See the commit 
https://github.com/apache/kafka/commit/28393be6d7416f51eb51f7fe2b075570b45ef09f


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] philipnee opened a new pull request #11871: Addendum to KAFKA 12879 to fix flaky tests

2022-03-09 Thread GitBox


philipnee opened a new pull request #11871:
URL: https://github.com/apache/kafka/pull/11871


   This is an addendum to the KAFKA12879 to fix the flaky tests introduced in 
the pull request.
   - Add an if check to void sleep(0)
   - Increase timeout in the tests
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13723) max.compaction.lag.ms implemented incorrectly

2022-03-09 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13723?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503813#comment-17503813
 ] 

Jun Rao commented on KAFKA-13723:
-

[~xiongqiwu] and [~jjkoshy]  : Could you check if this is a real issue? Thanks.

> max.compaction.lag.ms implemented incorrectly
> -
>
> Key: KAFKA-13723
> URL: https://issues.apache.org/jira/browse/KAFKA-13723
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.3.0
>Reporter: Jun Rao
>Priority: Major
>
> In https://issues.apache.org/jira/browse/KAFKA-7321, we introduced 
> max.compaction.lag.ms to guarantee that a record be cleaned before a certain 
> time. 
>  
> The implementation in LogCleanerManager has the following code. The path for 
> earliestDirtySegmentTimestamp < cleanUntilTime seems incorrect. In that case, 
> it seems that we should set the delay to 0 so that we could trigger cleaning 
> immediately since the segment has been dirty for longer than 
> max.compaction.lag.ms. 
>  
>  
> {code:java}
> def maxCompactionDelay(log: UnifiedLog, firstDirtyOffset: Long, now: Long) : 
> Long = {
> ...
> val maxCompactionLagMs = math.max(log.config.maxCompactionLagMs, 0L)
> val cleanUntilTime = now - maxCompactionLagMs
> if (earliestDirtySegmentTimestamp < cleanUntilTime)
> cleanUntilTime - earliestDirtySegmentTimestamp
> else
> 0L
> }{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13723) max.compaction.lag.ms implemented incorrectly

2022-03-09 Thread Jun Rao (Jira)
Jun Rao created KAFKA-13723:
---

 Summary: max.compaction.lag.ms implemented incorrectly
 Key: KAFKA-13723
 URL: https://issues.apache.org/jira/browse/KAFKA-13723
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.3.0
Reporter: Jun Rao


In https://issues.apache.org/jira/browse/KAFKA-7321, we introduced 
max.compaction.lag.ms to guarantee that a record be cleaned before a certain 
time. 

 

The implementation in LogCleanerManager has the following code. The path for 
earliestDirtySegmentTimestamp < cleanUntilTime seems incorrect. In that case, 
it seems that we should set the delay to 0 so that we could trigger cleaning 
immediately since the segment has been dirty for longer than 
max.compaction.lag.ms. 

 

 
{code:java}
def maxCompactionDelay(log: UnifiedLog, firstDirtyOffset: Long, now: Long) : 
Long = {

...

val maxCompactionLagMs = math.max(log.config.maxCompactionLagMs, 0L)
val cleanUntilTime = now - maxCompactionLagMs

if (earliestDirtySegmentTimestamp < cleanUntilTime)
cleanUntilTime - earliestDirtySegmentTimestamp
else
0L
}{code}
 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] guozhangwang commented on a change in pull request #11802: [RFC][1/N]add new RocksDBTimeOrderedWindowStore

2022-03-09 Thread GitBox


guozhangwang commented on a change in pull request #11802:
URL: https://github.com/apache/kafka/pull/11802#discussion_r822940645



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStore.java
##
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Optional;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
+import 
org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
+import 
org.apache.kafka.streams.processor.internals.StoreToProcessorContextAdapter;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.IQ_CONSISTENCY_OFFSET_VECTOR_ENABLED;
+
+public abstract class AbstractDualSchemaRocksDBSegmentedBytesStore implements SegmentedBytesStore {
+private static final Logger LOG = 
LoggerFactory.getLogger(AbstractDualSchemaRocksDBSegmentedBytesStore.class);
+
+private final String name;
+protected final AbstractSegments segments;
+private final String metricScope;
+protected final KeySchema baseKeySchema;
+protected final Optional indexKeySchema;
+
+
+private ProcessorContext context;
+private StateStoreContext stateStoreContext;
+private Sensor expiredRecordSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean consistencyEnabled = false;
+private Position position;
+protected OffsetCheckpoint positionCheckpoint;
+private volatile boolean open;
+
+AbstractDualSchemaRocksDBSegmentedBytesStore(final String name,
+ final String metricScope,
+ final KeySchema baseKeySchema,
+ final Optional 
indexKeySchema,
+ final AbstractSegments 
segments) {
+this.name = name;
+this.metricScope = metricScope;
+this.baseKeySchema = baseKeySchema;
+this.indexKeySchema = indexKeySchema;
+this.segments = segments;
+}
+
+@Override
+public KeyValueIterator all() {
+final List searchSpace = segments.allSegments(true);
+
+return new SegmentIterator<>(
+searchSpace.iterator(),
+baseKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE),
+null,
+null,
+true);
+}
+
+@Override
+public KeyValueIterator backwardAll() {
+final List searchSpace = segments.allSegments(false);
+
+return new SegmentIterator<>(
+searchSpace.iterator(),
+baseKeySchema.hasNextCondition(null, null, 0, Long.MAX_VALUE),
+null,
+null,
+false);
+}
+
+@Override
+public void remove(final Bytes rawKey) {
+final long timestamp = baseKeySchema.segmentTimestamp(rawKey);
+observedStreamTime = Math.max(observedStreamTime, timestamp);
+final S segment = segments.getSegmentForTimestamp(timestamp);
+if (segment == null) {
+return;
+}
+segment.delete(rawKey);
+}
+
+   

[GitHub] [kafka] junrao commented on pull request #11811: (docs) Add JavaDocs for org.apache.kafka.common.security.oauthbearer.secured

2022-03-09 Thread GitBox


junrao commented on pull request #11811:
URL: https://github.com/apache/kafka/pull/11811#issuecomment-1063288699


   @kirktrue : We could just make the changes to expose the proper public API 
and send an update to the original KIP-768 voting thread so that people are 
aware of the minor public changes if any.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] blcksrx commented on pull request #11838: KAFKA-1372: separate 400 error from 500 error in RestClient

2022-03-09 Thread GitBox


blcksrx commented on pull request #11838:
URL: https://github.com/apache/kafka/pull/11838#issuecomment-1063283897


   @mimaison 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10690) Produce-response delay caused by lagging replica fetch which affects in-sync one

2022-03-09 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503785#comment-17503785
 ] 

Jun Rao commented on KAFKA-10690:
-

[~ocadaruma] : Thanks for filing the jira. Have you tried enabling replication 
throttling? This will help prevent the out-of-sync replicas from pulling data 
too aggressively. 

> Produce-response delay caused by lagging replica fetch which affects in-sync 
> one
> 
>
> Key: KAFKA-10690
> URL: https://issues.apache.org/jira/browse/KAFKA-10690
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.4.1
>Reporter: Haruki Okada
>Priority: Major
> Attachments: image-2020-11-06-11-15-21-781.png, 
> image-2020-11-06-11-15-38-390.png, image-2020-11-06-11-17-09-910.png
>
>
> h2. Our environment
>  * Kafka version: 2.4.1
> h2. Phenomenon
>  * Produce response time 99th (remote scope) degrades to 500ms, which is 20 
> times worse than usual
>  ** Meanwhile, the cluster was running replica reassignment to service-in new 
> machine to recover replicas which held by failed (Hardware issue) broker 
> machine
> !image-2020-11-06-11-15-21-781.png|width=292,height=166!
> h2. Analysis
> Let's say
>  * broker-X: The broker we observed produce latency degradation
>  * broker-Y: The broker under servicing-in
> broker-Y was catching up replicas of partitions:
>  * partition-A: has relatively small log size
>  * partition-B: has large log size
> (actually, broker-Y was catching-up many other partitions. I noted only two 
> partitions here to make explanation simple)
> broker-X was the leader for both partition-A and partition-B.
> We found that both partition-A and partition-B are assigned to same 
> ReplicaFetcherThread of broker-Y, and produce latency started to degrade 
> right after broker-Y finished catching up partition-A.
> !image-2020-11-06-11-17-09-910.png|width=476,height=174!
> Besides, we observed disk reads on broker-X during service-in. (This is 
> natural since old segments are likely not in page cache)
> !image-2020-11-06-11-15-38-390.png|width=292,height=193!
> So we suspected that:
>  * In-sync replica fetch (partition-A) was involved by lagging replica fetch 
> (partition-B), which should be slow because it causes actual disk reads
>  ** Since ReplicaFetcherThread sends fetch requests in blocking manner, next 
> fetch request can't be sent until one fetch request completes
>  ** => Causes in-sync replica fetch for partitions assigned to same replica 
> fetcher thread to delay
>  ** => Causes remote scope produce latency degradation
> h2. Possible fix
> We think this issue can be addressed by designating part of 
> ReplicaFetcherThread (or creating another thread pool) for lagging replica 
> catching-up, but not so sure this is the appropriate way.
> Please give your opinions about this issue.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] rhauch merged pull request #11797: KAFKA-12879: Revert changes from KAFKA-12339 and instead add retry capability to KafkaBasedLog

2022-03-09 Thread GitBox


rhauch merged pull request #11797:
URL: https://github.com/apache/kafka/pull/11797


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck edited a comment on pull request #11870: MINOR: jmh.sh swallows compile errors

2022-03-09 Thread GitBox


bbejeck edited a comment on pull request #11870:
URL: https://github.com/apache/kafka/pull/11870#issuecomment-1063212582


   @ijuma - that was a while ago I can't remember exactly, but swelling 
compiler errors is not good.  So I think we should go ahead with @lbradstreet's 
changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #11870: MINOR: jmh.sh swallows compile errors

2022-03-09 Thread GitBox


bbejeck commented on pull request #11870:
URL: https://github.com/apache/kafka/pull/11870#issuecomment-1063212582


   @ijuma - that was a while ago I can't remember exactly, but I think we 
should go ahead with @lbradstreet's changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13722) Update internal interfaces that use ProcessorContext to use StateStoreContext instead

2022-03-09 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13722:
-

 Summary: Update internal interfaces that use ProcessorContext to 
use StateStoreContext instead
 Key: KAFKA-13722
 URL: https://issues.apache.org/jira/browse/KAFKA-13722
 Project: Kafka
  Issue Type: Improvement
Reporter: Guozhang Wang


This is a remainder that when we remove the deprecated public APIs that uses 
the ProcessorContext, like `StateStore.init`, we should also consider updating 
the internal interfaces with the ProcessorContext as well. That includes:

1. Segments and related util classes which use ProcessorContext.
2. For state stores that leverage on ProcessorContext.getXXXTime, their logic 
should be moved out of the state store impl but to the processor node level 
that calls on these state stores.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] guozhangwang commented on pull request #11802: [RFC][1/N]add new RocksDBTimeOrderedWindowStore

2022-03-09 Thread GitBox


guozhangwang commented on pull request #11802:
URL: https://github.com/apache/kafka/pull/11802#issuecomment-1063193852


   @lihaosky there are some `streams:checkstyleTest` failures in jenkins which 
should be resolved. Could you take a look into this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kirktrue commented on pull request #11811: (docs) Add JavaDocs for org.apache.kafka.common.security.oauthbearer.secured

2022-03-09 Thread GitBox


kirktrue commented on pull request #11811:
URL: https://github.com/apache/kafka/pull/11811#issuecomment-1063190001


   No, all of the classes (public and non-public) are in the package together. 
Separating them out should be trivial from a code standpoint but please advise 
on what is needed to change that the proper way from a process standpoint (KIP, 
Jira, etc.).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kirktrue edited a comment on pull request #11838: KAFKA-1372: separate 400 error from 500 error in RestClient

2022-03-09 Thread GitBox


kirktrue edited a comment on pull request #11838:
URL: https://github.com/apache/kafka/pull/11838#issuecomment-1063188384


   Thanks for the PR! It looks good to me. You need to get a committer to look 
at this in order to get it merged in. Perhaps a `git blame` can turn up someone 
who's worked on this that you can ping.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kirktrue commented on pull request #11838: KAFKA-1372: separate 400 error from 500 error in RestClient

2022-03-09 Thread GitBox


kirktrue commented on pull request #11838:
URL: https://github.com/apache/kafka/pull/11838#issuecomment-1063188384


   You need to get a committer to look at this for merging. Perhaps a `git 
blame` can turn up someone who's worked on this that you can ping.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #11870: MINOR: jmh.sh swallows compile errors

2022-03-09 Thread GitBox


ijuma commented on pull request #11870:
URL: https://github.com/apache/kafka/pull/11870#issuecomment-1063182074


   @bbejeck Do you recall why we did this originally?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13721) Left-join still emit spurious results in stream-stream joins in some cases

2022-03-09 Thread Nollet (Jira)
Nollet created KAFKA-13721:
--

 Summary: Left-join still emit spurious results in stream-stream 
joins in some cases
 Key: KAFKA-13721
 URL: https://issues.apache.org/jira/browse/KAFKA-13721
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.1.0
Reporter: Nollet


Stream-stream joins seems to still emit spurious results for some window 
configurations.

>From my tests, it happened when setting before to 0 and having a grace period 
>smaller than the window duration. More precisely it seems to happen when 
>setting before and 
window duration > grace period + before
h2. how to reproduce
{code:java}
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.time.Duration;
import java.time.Instant;
import java.util.Properties;

public class SpuriousLeftJoinTest {

static final Duration WINDOW_DURATION = Duration.ofMinutes(10);
static final Duration GRACE = Duration.ofMinutes(6);
static final Duration BEFORE = Duration.ZERO;
static final String LEFT_TOPIC_NAME = "LEFT_TOPIC";
static final String RIGHT_TOPIC_NAME = "RIGHT_TOPIC";
static final String OUTPUT_TOPIC_NAME = "OUTPUT_TOPIC";


private static TopologyTestDriver testDriver;
private static TestInputTopic inputTopicLeft;
private static TestInputTopic inputTopicRight;
private static TestOutputTopic outputTopic;

public static Topology createTopology() {

StreamsBuilder builder = new StreamsBuilder();

KStream leftStream = builder.stream(LEFT_TOPIC_NAME);
KStream rightStream = builder.stream(RIGHT_TOPIC_NAME);

// return 1 if left join matched, otherwise 0
KStream joined = leftStream.leftJoin(
rightStream,
(value1, value2) -> {
if(value2 == null){
return 0;
}
return 1;
},
JoinWindows.ofTimeDifferenceAndGrace(WINDOW_DURATION, GRACE)
.before(BEFORE)
);

joined.to(OUTPUT_TOPIC_NAME);

return builder.build();
}


@Before
public void setup() {

Topology topology = createTopology();

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.IntegerSerde.class);

testDriver = new TopologyTestDriver(topology, props);

inputTopicLeft = testDriver.createInputTopic(LEFT_TOPIC_NAME, 
Serdes.String().serializer(), Serdes.Integer().serializer());
inputTopicRight = testDriver.createInputTopic(RIGHT_TOPIC_NAME, 
Serdes.String().serializer(), Serdes.Integer().serializer());

outputTopic = testDriver.createOutputTopic(OUTPUT_TOPIC_NAME, 
Serdes.String().deserializer(), Serdes.Integer().deserializer());

}

@After
public void tearDown() {
testDriver.close();
}

@Test
public void shouldEmitOnlyOneMessageForKey1(){
Instant now = Instant.now();
inputTopicLeft.pipeInput("key1", 12, now);
inputTopicRight.pipeInput("key1", 13, now.plus(WINDOW_DURATION));

// send later record to increase stream time & close the window
inputTopicLeft.pipeInput("other_key", 1212122, 
now.plus(WINDOW_DURATION).plus(GRACE).plusSeconds(10));

while (! outputTopic.isEmpty()){
System.out.println(outputTopic.readKeyValue());
}
}


}
{code}
Stdout of previous code is
{noformat}
KeyValue(key1, 0)
KeyValue(key1, 1)
{noformat}
However it should be
{noformat}
KeyValue(key1, 1)
{noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] lbradstreet opened a new pull request #11870: MINOR: jmh.sh swallows compile errors

2022-03-09 Thread GitBox


lbradstreet opened a new pull request #11870:
URL: https://github.com/apache/kafka/pull/11870


   jmh.sh runs tasks in quiet mode which swallows compiler errors. This is a 
pain and I frequently have to edit the shell script to see the error.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-8065) Forwarding modified timestamps does not reset timestamp correctly

2022-03-09 Thread Josep Prat (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josep Prat reassigned KAFKA-8065:
-

Assignee: (was: Josep Prat)

> Forwarding modified timestamps does not reset timestamp correctly
> -
>
> Key: KAFKA-8065
> URL: https://issues.apache.org/jira/browse/KAFKA-8065
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1, 2.2.0, 2.1.1
>Reporter: Matthias J. Sax
>Priority: Major
> Fix For: 2.2.0, 2.0.2, 2.1.2
>
>
> Using Processor API, users can set a new output record timestamp via 
> `context.forward(..., To.all().withTimestamp(...))`. However, after the 
> forward()-call returns, the timestamp is not reset to the original input 
> record timestamp and thus a consecutive call to `context.forward(...)` 
> without `To` will use the newly set output record timestamp from before, too.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (KAFKA-8065) Forwarding modified timestamps does not reset timestamp correctly

2022-03-09 Thread Josep Prat (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josep Prat reassigned KAFKA-8065:
-

Assignee: Josep Prat  (was: Matthias J. Sax)

> Forwarding modified timestamps does not reset timestamp correctly
> -
>
> Key: KAFKA-8065
> URL: https://issues.apache.org/jira/browse/KAFKA-8065
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1, 2.2.0, 2.1.1
>Reporter: Matthias J. Sax
>Assignee: Josep Prat
>Priority: Major
> Fix For: 2.2.0, 2.0.2, 2.1.2
>
>
> Using Processor API, users can set a new output record timestamp via 
> `context.forward(..., To.all().withTimestamp(...))`. However, after the 
> forward()-call returns, the timestamp is not reset to the original input 
> record timestamp and thus a consecutive call to `context.forward(...)` 
> without `To` will use the newly set output record timestamp from before, too.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dajac commented on a change in pull request #11864: KAFKA-13717: skip coordinator lookup in commitOffsetsAsync if offsets is empty

2022-03-09 Thread GitBox


dajac commented on a change in pull request #11864:
URL: https://github.com/apache/kafka/pull/11864#discussion_r822676784



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -1000,8 +1000,11 @@ void invokeCompletedOffsetCommitCallbacks() {
 public RequestFuture commitOffsetsAsync(final Map offsets, final OffsetCommitCallback callback) {
 invokeCompletedOffsetCommitCallbacks();
 
-RequestFuture future =  null;
-if (!coordinatorUnknown()) {
+RequestFuture future = null;
+if (offsets.isEmpty()) {
+// No need to check coordinator if offsets is empty since commit 
of empty offsets is completed locally.
+future = doCommitOffsetsAsync(offsets, callback);
+} else if (!coordinatorUnknown()) {
 future = doCommitOffsetsAsync(offsets, callback);

Review comment:
   There is already a comment for that branch. Don't you think that this is 
enough?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock

2022-03-09 Thread GitBox


ijuma merged pull request #11722:
URL: https://github.com/apache/kafka/pull/11722


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #11722: KAFKA-13630: reduce amount of time that producer network thread holds batch queue lock

2022-03-09 Thread GitBox


ijuma commented on a change in pull request #11722:
URL: https://github.com/apache/kafka/pull/11722#discussion_r822653150



##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
##
@@ -448,40 +448,48 @@ public ReadyCheckResult ready(Cluster cluster, long 
nowMs) {
 boolean exhausted = this.free.queued() > 0;
 for (Map.Entry> entry : 
this.batches.entrySet()) {
 Deque deque = entry.getValue();
+
+final ProducerBatch batch;
+final long waitedTimeMs;
+final boolean backingOff;
+final boolean full;
+
+// Collect as little as possible inside critical region, determine 
outcome after release
 synchronized (deque) {
-// When producing to a large number of partitions, this path 
is hot and deques are often empty.
-// We check whether a batch exists first to avoid the more 
expensive checks whenever possible.
-ProducerBatch batch = deque.peekFirst();
-if (batch != null) {
-TopicPartition part = entry.getKey();
-Node leader = cluster.leaderFor(part);
-if (leader == null) {
-// This is a partition for which leader is not known, 
but messages are available to send.
-// Note that entries are currently not removed from 
batches when deque is empty.
-unknownLeaderTopics.add(part.topic());
-} else if (!readyNodes.contains(leader) && !isMuted(part)) 
{
-long waitedTimeMs = batch.waitedTimeMs(nowMs);
-boolean backingOff = batch.attempts() > 0 && 
waitedTimeMs < retryBackoffMs;
-long timeToWaitMs = backingOff ? retryBackoffMs : 
lingerMs;
-boolean full = deque.size() > 1 || batch.isFull();
-boolean expired = waitedTimeMs >= timeToWaitMs;
-boolean transactionCompleting = transactionManager != 
null && transactionManager.isCompleting();
-boolean sendable = full
-|| expired
-|| exhausted
-|| closed
-|| flushInProgress()
-|| transactionCompleting;
-if (sendable && !backingOff) {
-readyNodes.add(leader);
-} else {
-long timeLeftMs = Math.max(timeToWaitMs - 
waitedTimeMs, 0);
-// Note that this results in a conservative 
estimate since an un-sendable partition may have
-// a leader that will later be found to have 
sendable data. However, this is good enough
-// since we'll just wake up and then sleep again 
for the remaining time.
-nextReadyCheckDelayMs = Math.min(timeLeftMs, 
nextReadyCheckDelayMs);
-}
-}
+batch = deque.peekFirst();

Review comment:
   Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11864: KAFKA-13717: skip coordinator lookup in commitOffsetsAsync if offsets is empty

2022-03-09 Thread GitBox


showuon commented on a change in pull request #11864:
URL: https://github.com/apache/kafka/pull/11864#discussion_r822634915



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -1000,8 +1000,11 @@ void invokeCompletedOffsetCommitCallbacks() {
 public RequestFuture commitOffsetsAsync(final Map offsets, final OffsetCommitCallback callback) {
 invokeCompletedOffsetCommitCallbacks();
 
-RequestFuture future =  null;
-if (!coordinatorUnknown()) {
+RequestFuture future = null;
+if (offsets.isEmpty()) {
+// No need to check coordinator if offsets is empty since commit 
of empty offsets is completed locally.
+future = doCommitOffsetsAsync(offsets, callback);
+} else if (!coordinatorUnknown()) {
 future = doCommitOffsetsAsync(offsets, callback);

Review comment:
   I'm wonder if other dev would think this is a mistake when seeing this. 
Or at least we should leave a comment to mention this is on purpose. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10690) Produce-response delay caused by lagging replica fetch which affects in-sync one

2022-03-09 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10690?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503574#comment-17503574
 ] 

Luke Chen commented on KAFKA-10690:
---

[~ocadaruma] , thanks for resporing. One question:

Are you sure this issue is due to the `in-sync` replica fetch? Could you have a 
PoC to add an additional thread pool for lagging replica to confirm this 
solution?

Thank you.

> Produce-response delay caused by lagging replica fetch which affects in-sync 
> one
> 
>
> Key: KAFKA-10690
> URL: https://issues.apache.org/jira/browse/KAFKA-10690
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.4.1
>Reporter: Haruki Okada
>Priority: Major
> Attachments: image-2020-11-06-11-15-21-781.png, 
> image-2020-11-06-11-15-38-390.png, image-2020-11-06-11-17-09-910.png
>
>
> h2. Our environment
>  * Kafka version: 2.4.1
> h2. Phenomenon
>  * Produce response time 99th (remote scope) degrades to 500ms, which is 20 
> times worse than usual
>  ** Meanwhile, the cluster was running replica reassignment to service-in new 
> machine to recover replicas which held by failed (Hardware issue) broker 
> machine
> !image-2020-11-06-11-15-21-781.png|width=292,height=166!
> h2. Analysis
> Let's say
>  * broker-X: The broker we observed produce latency degradation
>  * broker-Y: The broker under servicing-in
> broker-Y was catching up replicas of partitions:
>  * partition-A: has relatively small log size
>  * partition-B: has large log size
> (actually, broker-Y was catching-up many other partitions. I noted only two 
> partitions here to make explanation simple)
> broker-X was the leader for both partition-A and partition-B.
> We found that both partition-A and partition-B are assigned to same 
> ReplicaFetcherThread of broker-Y, and produce latency started to degrade 
> right after broker-Y finished catching up partition-A.
> !image-2020-11-06-11-17-09-910.png|width=476,height=174!
> Besides, we observed disk reads on broker-X during service-in. (This is 
> natural since old segments are likely not in page cache)
> !image-2020-11-06-11-15-38-390.png|width=292,height=193!
> So we suspected that:
>  * In-sync replica fetch (partition-A) was involved by lagging replica fetch 
> (partition-B), which should be slow because it causes actual disk reads
>  ** Since ReplicaFetcherThread sends fetch requests in blocking manner, next 
> fetch request can't be sent until one fetch request completes
>  ** => Causes in-sync replica fetch for partitions assigned to same replica 
> fetcher thread to delay
>  ** => Causes remote scope produce latency degradation
> h2. Possible fix
> We think this issue can be addressed by designating part of 
> ReplicaFetcherThread (or creating another thread pool) for lagging replica 
> catching-up, but not so sure this is the appropriate way.
> Please give your opinions about this issue.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dajac commented on a change in pull request #11864: KAFKA-13717: skip coordinator lookup in commitOffsetsAsync if offsets is empty

2022-03-09 Thread GitBox


dajac commented on a change in pull request #11864:
URL: https://github.com/apache/kafka/pull/11864#discussion_r822622560



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -1000,8 +1000,11 @@ void invokeCompletedOffsetCommitCallbacks() {
 public RequestFuture commitOffsetsAsync(final Map offsets, final OffsetCommitCallback callback) {
 invokeCompletedOffsetCommitCallbacks();
 
-RequestFuture future =  null;
-if (!coordinatorUnknown()) {
+RequestFuture future = null;
+if (offsets.isEmpty()) {
+// No need to check coordinator if offsets is empty since commit 
of empty offsets is completed locally.
+future = doCommitOffsetsAsync(offsets, callback);
+} else if (!coordinatorUnknown()) {
 future = doCommitOffsetsAsync(offsets, callback);

Review comment:
   I considered this but I think that it is clearer when kept separated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11864: KAFKA-13717: skip coordinator lookup in commitOffsetsAsync if offsets is empty

2022-03-09 Thread GitBox


showuon commented on a change in pull request #11864:
URL: https://github.com/apache/kafka/pull/11864#discussion_r822620734



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -1000,8 +1000,11 @@ void invokeCompletedOffsetCommitCallbacks() {
 public RequestFuture commitOffsetsAsync(final Map offsets, final OffsetCommitCallback callback) {
 invokeCompletedOffsetCommitCallbacks();
 
-RequestFuture future =  null;
-if (!coordinatorUnknown()) {
+RequestFuture future = null;
+if (offsets.isEmpty()) {
+// No need to check coordinator if offsets is empty since commit 
of empty offsets is completed locally.
+future = doCommitOffsetsAsync(offsets, callback);
+} else if (!coordinatorUnknown()) {
 future = doCommitOffsetsAsync(offsets, callback);

Review comment:
   Could we do this and add the comment above?
   ```java
   // No need to check coordinator if offsets is empty since commit of empty 
offsets is completed locally.
   if (offsets.isEmpty() || !coordinatorUnknown()) {
   future = doCommitOffsetsAsync(offsets, callback);
   }
   ```
   

##
File path: 
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
##
@@ -2483,6 +2483,20 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 }
   }
 
+
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateAndCloseConsumerWithNoAccess(quorum: String): Unit = {
+val consumer = createConsumer()
+try {
+  // Close consumer without consuming anything. close() call should pass 
successfully and throw no exception.
+  consumer.close()
+} catch {
+  case e: Throwable =>
+fail(s"Exception not expected on closing consumer: $e")
+}

Review comment:
   Could we use `assertDoesNotThrow` instead? I.e.:
   ```java
   assertDoesNotThrow(() => consumer.close(), s"Exception not expected on 
closing consumer)
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] soarez commented on pull request #9577: KAFKA-9837: KIP-589 new RPC for notifying controller log dir failure

2022-03-09 Thread GitBox


soarez commented on pull request #9577:
URL: https://github.com/apache/kafka/pull/9577#issuecomment-1062889765


   @mumrah @hachikuji @bbejeck can anyone review this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abdulshaikh76 commented on pull request #8831: KAFKA-8657:Client-side automatic topic creation on Producer

2022-03-09 Thread GitBox


abdulshaikh76 commented on pull request #8831:
URL: https://github.com/apache/kafka/pull/8831#issuecomment-1062882178


   is this changes available  ? In which version of kafka-client jar  ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] sunshujie1990 commented on pull request #11869: KAFKA-13719: fix connector restart cause duplicate tasks

2022-03-09 Thread GitBox


sunshujie1990 commented on pull request #11869:
URL: https://github.com/apache/kafka/pull/11869#issuecomment-1062864985


   @hachikuji Jason, please help to review it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13720) Few topic partitions remain under replicated after broker lose connectivity to zookeeper

2022-03-09 Thread Dhirendra Singh (Jira)
Dhirendra Singh created KAFKA-13720:
---

 Summary: Few topic partitions remain under replicated after broker 
lose connectivity to zookeeper
 Key: KAFKA-13720
 URL: https://issues.apache.org/jira/browse/KAFKA-13720
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 2.7.1
Reporter: Dhirendra Singh


Few topic partitions remain under replicated after broker lose connectivity to 
zookeeper.
It only happens when brokers lose connectivity to zookeeper and it results in 
change in active controller. Issue does not occur always but randomly.
Issue never occurs when there is no change in active controller when brokers 
lose connectivity to zookeeper.
Following error message i found in the log file.


[2022-02-28 04:01:20,217] WARN [Partition __consumer_offsets-4 broker=1] 
Controller failed to update ISR to PendingExpandIsr(isr=Set(1), 
newInSyncReplicaId=2) due to unexpected UNKNOWN_SERVER_ERROR. Retrying. 
(kafka.cluster.Partition)
[2022-02-28 04:01:20,217] ERROR [broker-1-to-controller] Uncaught error in 
request completion: (org.apache.kafka.clients.NetworkClient)
java.lang.IllegalStateException: Failed to enqueue `AlterIsr` request with 
state LeaderAndIsr(leader=1, leaderEpoch=2728, isr=List(1, 2), zkVersion=4719) 
for partition __consumer_offsets-4
at kafka.cluster.Partition.sendAlterIsrRequest(Partition.scala:1403)
at 
kafka.cluster.Partition.$anonfun$handleAlterIsrResponse$1(Partition.scala:1438)
at kafka.cluster.Partition.handleAlterIsrResponse(Partition.scala:1417)
at kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1(Partition.scala:1398)
at 
kafka.cluster.Partition.$anonfun$sendAlterIsrRequest$1$adapted(Partition.scala:1398)
at 
kafka.server.AlterIsrManagerImpl.$anonfun$handleAlterIsrResponse$8(AlterIsrManager.scala:166)
at 
kafka.server.AlterIsrManagerImpl.$anonfun$handleAlterIsrResponse$8$adapted(AlterIsrManager.scala:163)
at scala.collection.immutable.List.foreach(List.scala:333)
at 
kafka.server.AlterIsrManagerImpl.handleAlterIsrResponse(AlterIsrManager.scala:163)
at kafka.server.AlterIsrManagerImpl.responseHandler$1(AlterIsrManager.scala:94)
at 
kafka.server.AlterIsrManagerImpl.$anonfun$sendRequest$2(AlterIsrManager.scala:104)
at 
kafka.server.BrokerToControllerRequestThread.handleResponse(BrokerToControllerChannelManagerImpl.scala:175)
at 
kafka.server.BrokerToControllerRequestThread.$anonfun$generateRequests$1(BrokerToControllerChannelManagerImpl.scala:158)
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
at 
org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:586)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:578)
at kafka.common.InterBrokerSendThread.doWork(InterBrokerSendThread.scala:71)
at 
kafka.server.BrokerToControllerRequestThread.doWork(BrokerToControllerChannelManagerImpl.scala:183)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
 
under replication count goes to zero after the controller broker is restarted 
again. but this require manual intervention.
Expectation is that when broker reconnect with zookeeper cluster should come 
back to stable state with under replication count as zero by itself without any 
manual intervention.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] (KAFKA-13719) connector restart cause duplicate tasks

2022-03-09 Thread Shujie Sun (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-13719 ]


Shujie Sun deleted comment on KAFKA-13719:


was (Author: sunshujie):
https://github.com/apache/kafka/pull/11869

> connector restart cause duplicate tasks
> ---
>
> Key: KAFKA-13719
> URL: https://issues.apache.org/jira/browse/KAFKA-13719
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.0
>Reporter: Shujie Sun
>Priority: Critical
>
> Restart connector with parameter includeTasks=true=false cause 
> duplicate tasks and duplicate message。



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13719) connector restart cause duplicate tasks

2022-03-09 Thread Shujie Sun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13719?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503502#comment-17503502
 ] 

Shujie Sun commented on KAFKA-13719:


https://github.com/apache/kafka/pull/11869

> connector restart cause duplicate tasks
> ---
>
> Key: KAFKA-13719
> URL: https://issues.apache.org/jira/browse/KAFKA-13719
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.0
>Reporter: Shujie Sun
>Priority: Critical
>
> Restart connector with parameter includeTasks=true=false cause 
> duplicate tasks and duplicate message。



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13719) connector restart cause duplicate tasks

2022-03-09 Thread Shujie Sun (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shujie Sun updated KAFKA-13719:
---
Attachment: (was: image-2022-03-09-18-57-09-467.png)

> connector restart cause duplicate tasks
> ---
>
> Key: KAFKA-13719
> URL: https://issues.apache.org/jira/browse/KAFKA-13719
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.0
>Reporter: Shujie Sun
>Priority: Critical
>
> Restart connector with parameter includeTasks=true=false cause 
> duplicate tasks and duplicate message。



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] sunshujie1990 opened a new pull request #11869: KAFKA-13719: fix connector restart cause duplicate tasks

2022-03-09 Thread GitBox


sunshujie1990 opened a new pull request #11869:
URL: https://github.com/apache/kafka/pull/11869


   When kafka Connect restarts connector with includeTasks=true,  
DistributedHerder start all task without filter the currentAssignments. This 
results in duplicate tasks and duplicate data。


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13719) connector restart cause duplicate tasks

2022-03-09 Thread Shujie Sun (Jira)
Shujie Sun created KAFKA-13719:
--

 Summary: connector restart cause duplicate tasks
 Key: KAFKA-13719
 URL: https://issues.apache.org/jira/browse/KAFKA-13719
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 3.0.0
Reporter: Shujie Sun
 Attachments: image-2022-03-09-18-57-09-467.png

Restart connector with parameter includeTasks=true=false cause 
duplicate tasks and duplicate message。



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13699) ProcessorContext does not expose Stream Time

2022-03-09 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13699?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503493#comment-17503493
 ] 

Mickael Maison commented on KAFKA-13699:


Even if it was voted and partly landed in 3.0.0, I think we should only the 
missing APIs in 3.2.0.  

> ProcessorContext does not expose Stream Time
> 
>
> Key: KAFKA-13699
> URL: https://issues.apache.org/jira/browse/KAFKA-13699
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Shay Lin
>Priority: Major
>  Labels: newbie
>
> As a KS developer, I would like to leverage 
> [KIP-622|https://cwiki.apache.org/confluence/display/KAFKA/KIP-622%3A+Add+currentSystemTimeMs+and+currentStreamTimeMs+to+ProcessorContext]
>  and access stream time in Processor Context.
> _(Updated)_
> However, the methods currentStreamTimeMs or currentSystemTimeMs is missing 
> from for KStreams 3.0+.
> Checked with [~mjsax] , the methods are absent from the Processor API , i.e.
>  * org.apache.kafka.streams.processor.api.ProcessorContext



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] mimaison commented on pull request #11817: KAFKA-13438: Replace EasyMock and PowerMock with Mockito in WorkerTest

2022-03-09 Thread GitBox


mimaison commented on pull request #11817:
URL: https://github.com/apache/kafka/pull/11817#issuecomment-1062783063


   Thanks for the update. There's an unused import that is failing 
[checkstyle](https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-11817/3/pipeline):
   ```
   [ant:checkstyle] [ERROR] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-11817/connect/runtime/src/test/java/org/apache/kafka/connect/connector/policy/BaseConnectorClientConfigOverridePolicyTest.java:23:8:
 Unused import - org.apache.kafka.connect.runtime.WorkerTest. [UnusedImports]
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #11868: KAFKA-12648: fix flaky #shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning

2022-03-09 Thread GitBox


ableegoldman opened a new pull request #11868:
URL: https://github.com/apache/kafka/pull/11868


   This test has started to become flaky at a relatively low, but consistently 
reproducible, rate. Upon inspection, we find this is due to IOExceptions during 
the #cleanUpNamedTopology call -- specifically, most often a 
`DirectoryNotEmptyException` with an ocasional` FileNotFoundException`
   
   Basically, signs pointed to having returned from/completed the 
`#removeNamedTopology` future prematurely, and moving on to try and clear out 
the topology's state directory while there was a streamthread somewhere that 
was continuing to process/close its tasks.
   
   I believe this is due to updating the thread's topology version _before_ we 
perform the actual topology update, in this case specifically the act of eg 
clearing out a directory. If one thread updates its version and then goes to 
perform the topology removal/cleanup when the second thread finishes its own 
topology removal, this other thread will check whether all threads are on the 
latest version and complete any waiting futures if so -- which means it can 
complete the future before the first thread has actually completed the 
corresponding action


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >