[GitHub] [kafka] artemlivshits commented on pull request #12365: KAFKA-14020: Performance regression in Producer

2022-07-19 Thread GitBox


artemlivshits commented on PR #12365:
URL: https://github.com/apache/kafka/pull/12365#issuecomment-1189805608

   Looked at the failed tests, seem unrelated and pass locally.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

2022-07-19 Thread GitBox


showuon commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r925138410


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -1299,17 +1300,79 @@ public void 
testForceMetadataDeleteForPatternSubscriptionDuringRebalance() {
 }
 }
 
+@Test
+public void testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry() {
+try (ConsumerCoordinator coordinator = 
prepareCoordinatorForCloseTest(true, true, Optional.empty(), false)) {
+int generationId = 42;
+String memberId = "consumer-42";
+
+Timer pollTimer = time.timer(100L);
+client.prepareResponse(offsetCommitResponse(singletonMap(t1p, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)));
+boolean res = coordinator.onJoinPrepare(pollTimer, generationId, 
memberId);
+assertFalse(res);
+
+pollTimer = time.timer(100L);
+client.prepareResponse(offsetCommitResponse(singletonMap(t1p, 
Errors.NONE)));
+res = coordinator.onJoinPrepare(pollTimer, generationId, memberId);
+assertTrue(res);
+
+assertFalse(client.hasPendingResponses());
+assertFalse(client.hasInFlightRequests());
+assertFalse(coordinator.coordinatorUnknown());
+}
+}
+
+@Test
+public void 
testOnJoinPrepareWithOffsetCommitShouldKeepJoinAfterNonRetryableException() {
+try (ConsumerCoordinator coordinator = 
prepareCoordinatorForCloseTest(true, true, Optional.empty(), false)) {
+int generationId = 42;
+String memberId = "consumer-42";
+
+Timer pollTimer = time.timer(100L);
+client.prepareResponse(offsetCommitResponse(singletonMap(t1p, 
Errors.UNKNOWN_MEMBER_ID)));
+boolean res = coordinator.onJoinPrepare(pollTimer, generationId, 
memberId);
+assertTrue(res);
+
+assertFalse(client.hasPendingResponses());
+assertFalse(client.hasInFlightRequests());
+assertFalse(coordinator.coordinatorUnknown());
+}
+}
+
+@Test
+public void 
testOnJoinPrepareWithOffsetCommitShouldKeepJoinAfterRebalanceTimeout() {
+try (ConsumerCoordinator coordinator = 
prepareCoordinatorForCloseTest(true, true, Optional.empty(), false)) {
+int generationId = 42;
+String memberId = "consumer-42";
+
+Timer pollTimer = time.timer(100L);
+time.sleep(150);

Review Comment:
   @aiquestion , could you file another PR to remove this line? This is 
unneeded, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14088) KafkaChannel memory leak

2022-07-19 Thread Gao Fei (Jira)
Gao Fei created KAFKA-14088:
---

 Summary: KafkaChannel memory leak
 Key: KAFKA-14088
 URL: https://issues.apache.org/jira/browse/KAFKA-14088
 Project: Kafka
  Issue Type: Bug
  Components: network
Affects Versions: 2.2.1
 Environment: Current system environment:
kafka version: 2.2.1
openjdk(openj9): jdk1.8
Heap memory: 6.4GB
MaxDirectSize: 8GB
Total number of topics: about 150+, each with about 3 partitions
Reporter: Gao Fei


The kafka broker reports OutOfMemoryError: Java heap space and 
OutOfMemoryError: Direct buffer memory at the same time. Through the memory 
dump, it is found that the most occupied objects are 
KafkaChannel->NetworkReceive->HeapByteBuffer, there are about 4 such 
KafkaChannels, each about 1.5GB Around, and the total heap memory allocation is 
only 6.4GB.
It's strange why a KafkaChannel occupies so much heap memory. Isn't each batch 
request slowly written to disk through the RequestHandler thread? Normally, 
this memory in KafkaChannel should be released continuously, but it is not 
released.
I am curious why there is such a large HeapByteBuffer object in KafkaChannel? 
What does this object store? Shouldn't the socket communication here use a lot 
of direct memory? Instead, why a lot of heap memory is used, and why is it not 
released?
The business data is not very large, the business data of each customer is 
different, and some customers have this OOM in the environment, and some 
customers with large business data do not appear OOM.

java.lang.OutOfMemoryError: Direct buffer memory
    at java.nio.Bits.reserveMemory(Bits.java:693)
    at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
    at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174)
    at sun.nio.ch.IOUtil.read(IOUtil.java:195)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at 
org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
    at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
    at 
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
    at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
    at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
    at kafka.network.Processor.poll(SocketServer.scala:863)
    at kafka.network.Processor.run(SocketServer.scala:762)
    at java.lang.Thread.run(Thread.java:745)

java.lang.OutOfMemoryError: Java heap space
    at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
    at org.apache.kafka.common.MemoryPool$1.tryAllocate(MemoryPool.java:30)
    at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
    at 
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
    at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
    at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
    at kafka.network.Processor.poll(SocketServer.scala:863)
    at kafka.network.Processor.run(SocketServer.scala:762)
    at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14024) Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare

2022-07-19 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-14024.
---
Resolution: Fixed

> Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare
> --
>
> Key: KAFKA-14024
> URL: https://issues.apache.org/jira/browse/KAFKA-14024
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.2.0
>Reporter: Shawn Wang
>Assignee: Shawn Wang
>Priority: Blocker
>  Labels: new-consumer-threading-should-fix
> Fix For: 3.3.0, 3.2.1
>
>
> Hi 
> In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue 
> that consumer#poll(duration) will be returned after the provided duration. 
> It's because if rebalance needed, we'll try to commit current offset first 
> before rebalance synchronously. And if the offset committing takes too long, 
> the consumer#poll will spend more time than provided duration. To fix that, 
> we change commit sync with commit async before rebalance (i.e. onPrepareJoin).
>  
> However, in this ticket, we found the async commit will keep sending a new 
> commit request during each Consumer#poll, because the offset commit never 
> completes in time. The impact is that the existing consumer will be kicked 
> out of the group after rebalance timeout without joining the group. That is, 
> suppose we have consumer A in group G, and now consumer B joined the group, 
> after the rebalance, only consumer B in the group.
>  
> The workaround for this issue is to change the assignor back to eager 
> assignors, ex: StickyAssignor, RoundRobinAssignor.
>  
> To fix the issue, we come out 2 solutions:
>  # we can explicitly wait for the async commit complete in onPrepareJoin, but 
> that would let the KAFKA-13310 issue happen again.
>  # 2.we can try to keep the async commit offset future currently inflight. So 
> that we can make sure each Consumer#poll, we are waiting for the future 
> completes
>  
> Besides, there's also another bug found during fixing this bug. Before 
> KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry 
> when retriable error until timeout. After KAFKA-13310, we thought we have 
> retry, but we'll retry after partitions revoking. That is, even though the 
> retried offset commit successfully, it still causes some partitions offsets 
> un-committed, and after rebalance, other consumers will consume overlapping 
> records.
>  
>  
> ===
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752]
>  
> we didn't wait for client to receive commit offset response here, so 
> onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and 
> client will loop in invoking onJoinPrepare.
> I think the EAGER mode don't have this problem because it will revoke the 
> partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try 
> to commit next round.
> reproduce:
>  * single node Kafka version 3.2.0 && client version 3.2.0
>  * topic1 have 5 partititons
>  * start a consumer1 (cooperative rebalance)
>  * start another consumer2 (same consumer group)
>  * consumer1 will hang for a long time before re-join
>  * from server log consumer1 rebalance timeout before joineGroup and re-join 
> with another memberId
> consume1's log keeps printing:
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation 
> 54 and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938 
> (ConsumerCoordinator.java:739)
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of 
> offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} 
> (ConsumerCoordinator.java:1143)
>  
> and coordinator's log:
> [2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance 
> group xxx in state PreparingRebalance with old generation 56 
> (__consumer_offsets-30) (reason: Adding new member 
> consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id 
> None; client reason: rebalance failed due to 'The group member needs to have 
> a valid member id before actually entering a consumer group.' 
> (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:43,855] INFO [GroupCoordinator 0]: Group xxx removed 
> dynamic members who haven't joined: 
> Set(consumer-xxx-1-d62a0923-6ca6-48dd-a84e-f97136d4603a) 
> (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:43,856] INFO [GroupCoordinator 0]: Stabilized group xxx 
> generation 57 (__consumer_offsets-30) 

[jira] [Commented] (KAFKA-14024) Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare

2022-07-19 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568783#comment-17568783
 ] 

Luke Chen commented on KAFKA-14024:
---

> In this way we would not need a separate timer inside the `onJoinPrepare` for 
> the commit itself. 

[~guozhang] , thanks for the suggestion. Yes, that looks simpler! I like it. 
But since release time approaching and there will be new rebalance protocol 
(KIP-848) coming soon, I'm going to merge it as is. But again, thanks for the 
comment. I learned something from it. Thanks.

 

[~aiquestion] , thanks again for finding the issue and the PR!

> Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare
> --
>
> Key: KAFKA-14024
> URL: https://issues.apache.org/jira/browse/KAFKA-14024
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.2.0
>Reporter: Shawn Wang
>Assignee: Shawn Wang
>Priority: Blocker
>  Labels: new-consumer-threading-should-fix
> Fix For: 3.3.0, 3.2.1
>
>
> Hi 
> In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue 
> that consumer#poll(duration) will be returned after the provided duration. 
> It's because if rebalance needed, we'll try to commit current offset first 
> before rebalance synchronously. And if the offset committing takes too long, 
> the consumer#poll will spend more time than provided duration. To fix that, 
> we change commit sync with commit async before rebalance (i.e. onPrepareJoin).
>  
> However, in this ticket, we found the async commit will keep sending a new 
> commit request during each Consumer#poll, because the offset commit never 
> completes in time. The impact is that the existing consumer will be kicked 
> out of the group after rebalance timeout without joining the group. That is, 
> suppose we have consumer A in group G, and now consumer B joined the group, 
> after the rebalance, only consumer B in the group.
>  
> The workaround for this issue is to change the assignor back to eager 
> assignors, ex: StickyAssignor, RoundRobinAssignor.
>  
> To fix the issue, we come out 2 solutions:
>  # we can explicitly wait for the async commit complete in onPrepareJoin, but 
> that would let the KAFKA-13310 issue happen again.
>  # 2.we can try to keep the async commit offset future currently inflight. So 
> that we can make sure each Consumer#poll, we are waiting for the future 
> completes
>  
> Besides, there's also another bug found during fixing this bug. Before 
> KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry 
> when retriable error until timeout. After KAFKA-13310, we thought we have 
> retry, but we'll retry after partitions revoking. That is, even though the 
> retried offset commit successfully, it still causes some partitions offsets 
> un-committed, and after rebalance, other consumers will consume overlapping 
> records.
>  
>  
> ===
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752]
>  
> we didn't wait for client to receive commit offset response here, so 
> onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and 
> client will loop in invoking onJoinPrepare.
> I think the EAGER mode don't have this problem because it will revoke the 
> partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try 
> to commit next round.
> reproduce:
>  * single node Kafka version 3.2.0 && client version 3.2.0
>  * topic1 have 5 partititons
>  * start a consumer1 (cooperative rebalance)
>  * start another consumer2 (same consumer group)
>  * consumer1 will hang for a long time before re-join
>  * from server log consumer1 rebalance timeout before joineGroup and re-join 
> with another memberId
> consume1's log keeps printing:
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation 
> 54 and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938 
> (ConsumerCoordinator.java:739)
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of 
> offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} 
> (ConsumerCoordinator.java:1143)
>  
> and coordinator's log:
> [2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance 
> group xxx in state PreparingRebalance with old generation 56 
> (__consumer_offsets-30) (reason: Adding new member 
> consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id 
> None; client reason: rebalance failed due to 'The group member needs to have 
> a 

[GitHub] [kafka] showuon commented on pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

2022-07-19 Thread GitBox


showuon commented on PR #12349:
URL: https://github.com/apache/kafka/pull/12349#issuecomment-1189724526

   Backport to 3.3 and 3.2. cc @jsancio @mumrah 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon merged pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

2022-07-19 Thread GitBox


showuon merged PR #12349:
URL: https://github.com/apache/kafka/pull/12349


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

2022-07-19 Thread GitBox


showuon commented on PR #12349:
URL: https://github.com/apache/kafka/pull/12349#issuecomment-1189713426

   Failed tests are unrelated:
   ```
   Build / JDK 8 and Scala 2.12 / 
kafka.server.DynamicBrokerReconfigurationTest.testKeyStoreAlter()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] artemlivshits commented on pull request #12365: KAFKA-14020: Performance regression in Producer

2022-07-19 Thread GitBox


artemlivshits commented on PR #12365:
URL: https://github.com/apache/kafka/pull/12365#issuecomment-1189712085

   > Are the test failures related to the PR?
   
   Yes, just pushed the fix.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on pull request #12365: KAFKA-14020: Performance regression in Producer

2022-07-19 Thread GitBox


junrao commented on PR #12365:
URL: https://github.com/apache/kafka/pull/12365#issuecomment-1189657885

   @artemlivshits : Are the test failures related to the PR?
   
   @ijuma : Do you have any other comments?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

2022-07-19 Thread GitBox


badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r925034851


##
clients/src/main/java/org/apache/kafka/common/network/Selector.java:
##
@@ -652,6 +652,11 @@ void write(KafkaChannel channel) throws IOException {
 if (send != null) {
 this.completedSends.add(send);
 this.sensors.recordCompletedSend(nodeId, send.size(), 
currentTimeMs);
+
+// To fix KAFKA-13559, i.e. select(timeout) blocks for 300 ms 
when the socket has no data but the buffer
+// has data. Only happens when using SSL.
+if (channel.hasBytesBuffered())
+madeReadProgressLastPoll = true;

Review Comment:
   PR updated.. so I'll resolve this comment..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

2022-07-19 Thread GitBox


badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r925034643


##
clients/src/main/java/org/apache/kafka/common/network/Selector.java:
##
@@ -652,6 +652,11 @@ void write(KafkaChannel channel) throws IOException {
 if (send != null) {
 this.completedSends.add(send);
 this.sensors.recordCompletedSend(nodeId, send.size(), 
currentTimeMs);
+
+// To fix KAFKA-13559, i.e. select(timeout) blocks for 300 ms 
when the socket has no data but the buffer
+// has data. Only happens when using SSL.
+if (channel.hasBytesBuffered())
+madeReadProgressLastPoll = true;

Review Comment:
   @ijuma I have moved the fix to `pollSelectionKeys` as I mentioned before to 
make it less weird (hopefully).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

2022-07-19 Thread GitBox


badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r925034172


##
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##
@@ -1878,6 +1878,97 @@ class SocketServerTest {
 }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the 
following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into 
SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer 
has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket 
and put it in the buffer
+   *  (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in 
SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in 
SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This 
request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, 
"select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" 
method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+shutdownServerAndMetrics(server)
+
+// to ensure we only have 1 connection (channel)
+val props = sslServerProps
+val numConnections = 1
+props.put("max.connections.per.ip", numConnections.toString)
+
+// create server with SSL listener
+val testableServer = new TestableSocketServer(KafkaConfig.fromProps(props))
+testableServer.enableRequestProcessing(Map.empty)
+//   
dataPlaneAcceptors.get(endpoint).processors(0).selector.asInstanceOf[TestableSelector]

Review Comment:
   done.. PR updated..



##
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##
@@ -1878,6 +1878,97 @@ class SocketServerTest {
 }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the 
following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into 
SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer 
has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket 
and put it in the buffer
+   *  (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in 
SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in 
SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This 
request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, 
"select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" 
method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+shutdownServerAndMetrics(server)
+
+// to ensure we only have 1 connection (channel)
+val props = sslServerProps
+val numConnections = 1
+props.put("max.connections.per.ip", numConnections.toString)

Review Comment:
   done.. PR updated..



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue closed pull request #12327: WIP - DO NOT MERGE - KIP-714 #718

2022-07-19 Thread GitBox


kirktrue closed pull request #12327: WIP - DO NOT MERGE - KIP-714 #718
URL: https://github.com/apache/kafka/pull/12327


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

2022-07-19 Thread GitBox


jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r924957846


##
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+@Rule
+public Timeout globalTimeout = Timeout.seconds(600);
+@Rule
+public final TestName testName = new TestName();
+private static MockTime mockTime;
+
+@Rule
+public final TemporaryFolder testFolder = new 
TemporaryFolder(TestUtils.tempDirectory());
+
+protected static final String INPUT_TOPIC = "inputTopic";
+protected static final String OUTPUT_TOPIC = "outputTopic";
+
+protected static final int STREAMS_CONSUMER_TIMEOUT = 2;

Review Comment:
   @mjsax until we figure this out, we shouldn't merge this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

2022-07-19 Thread GitBox


jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r924957060


##
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest {
+@Rule
+public Timeout globalTimeout = Timeout.seconds(600);
+@Rule
+public final TestName testName = new TestName();
+private static MockTime mockTime;
+
+@Rule
+public final TemporaryFolder testFolder = new 
TemporaryFolder(TestUtils.tempDirectory());
+
+protected static final String INPUT_TOPIC = "inputTopic";
+protected static final String OUTPUT_TOPIC = "outputTopic";
+
+protected static final int STREAMS_CONSUMER_TIMEOUT = 2;

Review Comment:
   The test takes at least 20 seconds to run.  This makes me wonder if the 
removal from the CG is working properly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14024) Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare

2022-07-19 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568732#comment-17568732
 ] 

Guozhang Wang commented on KAFKA-14024:
---

Thanks to [~aiquestion] for filing this and also submitting the PR, I've added 
you as a contributor and assigned the ticket to you too.

> Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare
> --
>
> Key: KAFKA-14024
> URL: https://issues.apache.org/jira/browse/KAFKA-14024
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.2.0
>Reporter: Shawn Wang
>Assignee: Shawn Wang
>Priority: Blocker
>  Labels: new-consumer-threading-should-fix
> Fix For: 3.3.0, 3.2.1
>
>
> Hi 
> In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue 
> that consumer#poll(duration) will be returned after the provided duration. 
> It's because if rebalance needed, we'll try to commit current offset first 
> before rebalance synchronously. And if the offset committing takes too long, 
> the consumer#poll will spend more time than provided duration. To fix that, 
> we change commit sync with commit async before rebalance (i.e. onPrepareJoin).
>  
> However, in this ticket, we found the async commit will keep sending a new 
> commit request during each Consumer#poll, because the offset commit never 
> completes in time. The impact is that the existing consumer will be kicked 
> out of the group after rebalance timeout without joining the group. That is, 
> suppose we have consumer A in group G, and now consumer B joined the group, 
> after the rebalance, only consumer B in the group.
>  
> The workaround for this issue is to change the assignor back to eager 
> assignors, ex: StickyAssignor, RoundRobinAssignor.
>  
> To fix the issue, we come out 2 solutions:
>  # we can explicitly wait for the async commit complete in onPrepareJoin, but 
> that would let the KAFKA-13310 issue happen again.
>  # 2.we can try to keep the async commit offset future currently inflight. So 
> that we can make sure each Consumer#poll, we are waiting for the future 
> completes
>  
> Besides, there's also another bug found during fixing this bug. Before 
> KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry 
> when retriable error until timeout. After KAFKA-13310, we thought we have 
> retry, but we'll retry after partitions revoking. That is, even though the 
> retried offset commit successfully, it still causes some partitions offsets 
> un-committed, and after rebalance, other consumers will consume overlapping 
> records.
>  
>  
> ===
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752]
>  
> we didn't wait for client to receive commit offset response here, so 
> onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and 
> client will loop in invoking onJoinPrepare.
> I think the EAGER mode don't have this problem because it will revoke the 
> partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try 
> to commit next round.
> reproduce:
>  * single node Kafka version 3.2.0 && client version 3.2.0
>  * topic1 have 5 partititons
>  * start a consumer1 (cooperative rebalance)
>  * start another consumer2 (same consumer group)
>  * consumer1 will hang for a long time before re-join
>  * from server log consumer1 rebalance timeout before joineGroup and re-join 
> with another memberId
> consume1's log keeps printing:
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation 
> 54 and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938 
> (ConsumerCoordinator.java:739)
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of 
> offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} 
> (ConsumerCoordinator.java:1143)
>  
> and coordinator's log:
> [2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance 
> group xxx in state PreparingRebalance with old generation 56 
> (__consumer_offsets-30) (reason: Adding new member 
> consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id 
> None; client reason: rebalance failed due to 'The group member needs to have 
> a valid member id before actually entering a consumer group.' 
> (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:43,855] INFO [GroupCoordinator 0]: Group xxx removed 
> dynamic members who haven't joined: 
> Set(consumer-xxx-1-d62a0923-6ca6-48dd-a84e-f97136d4603a) 

[jira] [Updated] (KAFKA-14024) Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare

2022-07-19 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-14024:
--
Reviewer: Luke Chen

> Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare
> --
>
> Key: KAFKA-14024
> URL: https://issues.apache.org/jira/browse/KAFKA-14024
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.2.0
>Reporter: Shawn Wang
>Assignee: Shawn Wang
>Priority: Blocker
>  Labels: new-consumer-threading-should-fix
> Fix For: 3.3.0, 3.2.1
>
>
> Hi 
> In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue 
> that consumer#poll(duration) will be returned after the provided duration. 
> It's because if rebalance needed, we'll try to commit current offset first 
> before rebalance synchronously. And if the offset committing takes too long, 
> the consumer#poll will spend more time than provided duration. To fix that, 
> we change commit sync with commit async before rebalance (i.e. onPrepareJoin).
>  
> However, in this ticket, we found the async commit will keep sending a new 
> commit request during each Consumer#poll, because the offset commit never 
> completes in time. The impact is that the existing consumer will be kicked 
> out of the group after rebalance timeout without joining the group. That is, 
> suppose we have consumer A in group G, and now consumer B joined the group, 
> after the rebalance, only consumer B in the group.
>  
> The workaround for this issue is to change the assignor back to eager 
> assignors, ex: StickyAssignor, RoundRobinAssignor.
>  
> To fix the issue, we come out 2 solutions:
>  # we can explicitly wait for the async commit complete in onPrepareJoin, but 
> that would let the KAFKA-13310 issue happen again.
>  # 2.we can try to keep the async commit offset future currently inflight. So 
> that we can make sure each Consumer#poll, we are waiting for the future 
> completes
>  
> Besides, there's also another bug found during fixing this bug. Before 
> KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry 
> when retriable error until timeout. After KAFKA-13310, we thought we have 
> retry, but we'll retry after partitions revoking. That is, even though the 
> retried offset commit successfully, it still causes some partitions offsets 
> un-committed, and after rebalance, other consumers will consume overlapping 
> records.
>  
>  
> ===
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752]
>  
> we didn't wait for client to receive commit offset response here, so 
> onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and 
> client will loop in invoking onJoinPrepare.
> I think the EAGER mode don't have this problem because it will revoke the 
> partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try 
> to commit next round.
> reproduce:
>  * single node Kafka version 3.2.0 && client version 3.2.0
>  * topic1 have 5 partititons
>  * start a consumer1 (cooperative rebalance)
>  * start another consumer2 (same consumer group)
>  * consumer1 will hang for a long time before re-join
>  * from server log consumer1 rebalance timeout before joineGroup and re-join 
> with another memberId
> consume1's log keeps printing:
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation 
> 54 and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938 
> (ConsumerCoordinator.java:739)
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of 
> offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} 
> (ConsumerCoordinator.java:1143)
>  
> and coordinator's log:
> [2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance 
> group xxx in state PreparingRebalance with old generation 56 
> (__consumer_offsets-30) (reason: Adding new member 
> consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id 
> None; client reason: rebalance failed due to 'The group member needs to have 
> a valid member id before actually entering a consumer group.' 
> (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:43,855] INFO [GroupCoordinator 0]: Group xxx removed 
> dynamic members who haven't joined: 
> Set(consumer-xxx-1-d62a0923-6ca6-48dd-a84e-f97136d4603a) 
> (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:43,856] INFO [GroupCoordinator 0]: Stabilized group xxx 
> generation 57 

[jira] [Assigned] (KAFKA-14024) Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare

2022-07-19 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-14024:
-

Assignee: Guozhang Wang

> Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare
> --
>
> Key: KAFKA-14024
> URL: https://issues.apache.org/jira/browse/KAFKA-14024
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.2.0
>Reporter: Shawn Wang
>Assignee: Guozhang Wang
>Priority: Blocker
>  Labels: new-consumer-threading-should-fix
> Fix For: 3.3.0, 3.2.1
>
>
> Hi 
> In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue 
> that consumer#poll(duration) will be returned after the provided duration. 
> It's because if rebalance needed, we'll try to commit current offset first 
> before rebalance synchronously. And if the offset committing takes too long, 
> the consumer#poll will spend more time than provided duration. To fix that, 
> we change commit sync with commit async before rebalance (i.e. onPrepareJoin).
>  
> However, in this ticket, we found the async commit will keep sending a new 
> commit request during each Consumer#poll, because the offset commit never 
> completes in time. The impact is that the existing consumer will be kicked 
> out of the group after rebalance timeout without joining the group. That is, 
> suppose we have consumer A in group G, and now consumer B joined the group, 
> after the rebalance, only consumer B in the group.
>  
> The workaround for this issue is to change the assignor back to eager 
> assignors, ex: StickyAssignor, RoundRobinAssignor.
>  
> To fix the issue, we come out 2 solutions:
>  # we can explicitly wait for the async commit complete in onPrepareJoin, but 
> that would let the KAFKA-13310 issue happen again.
>  # 2.we can try to keep the async commit offset future currently inflight. So 
> that we can make sure each Consumer#poll, we are waiting for the future 
> completes
>  
> Besides, there's also another bug found during fixing this bug. Before 
> KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry 
> when retriable error until timeout. After KAFKA-13310, we thought we have 
> retry, but we'll retry after partitions revoking. That is, even though the 
> retried offset commit successfully, it still causes some partitions offsets 
> un-committed, and after rebalance, other consumers will consume overlapping 
> records.
>  
>  
> ===
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752]
>  
> we didn't wait for client to receive commit offset response here, so 
> onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and 
> client will loop in invoking onJoinPrepare.
> I think the EAGER mode don't have this problem because it will revoke the 
> partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try 
> to commit next round.
> reproduce:
>  * single node Kafka version 3.2.0 && client version 3.2.0
>  * topic1 have 5 partititons
>  * start a consumer1 (cooperative rebalance)
>  * start another consumer2 (same consumer group)
>  * consumer1 will hang for a long time before re-join
>  * from server log consumer1 rebalance timeout before joineGroup and re-join 
> with another memberId
> consume1's log keeps printing:
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation 
> 54 and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938 
> (ConsumerCoordinator.java:739)
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of 
> offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} 
> (ConsumerCoordinator.java:1143)
>  
> and coordinator's log:
> [2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance 
> group xxx in state PreparingRebalance with old generation 56 
> (__consumer_offsets-30) (reason: Adding new member 
> consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id 
> None; client reason: rebalance failed due to 'The group member needs to have 
> a valid member id before actually entering a consumer group.' 
> (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:43,855] INFO [GroupCoordinator 0]: Group xxx removed 
> dynamic members who haven't joined: 
> Set(consumer-xxx-1-d62a0923-6ca6-48dd-a84e-f97136d4603a) 
> (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:43,856] INFO [GroupCoordinator 0]: Stabilized group xxx 
> generation 57 

[jira] [Assigned] (KAFKA-14024) Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare

2022-07-19 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-14024:
-

Assignee: Shawn Wang  (was: Guozhang Wang)

> Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare
> --
>
> Key: KAFKA-14024
> URL: https://issues.apache.org/jira/browse/KAFKA-14024
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.2.0
>Reporter: Shawn Wang
>Assignee: Shawn Wang
>Priority: Blocker
>  Labels: new-consumer-threading-should-fix
> Fix For: 3.3.0, 3.2.1
>
>
> Hi 
> In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue 
> that consumer#poll(duration) will be returned after the provided duration. 
> It's because if rebalance needed, we'll try to commit current offset first 
> before rebalance synchronously. And if the offset committing takes too long, 
> the consumer#poll will spend more time than provided duration. To fix that, 
> we change commit sync with commit async before rebalance (i.e. onPrepareJoin).
>  
> However, in this ticket, we found the async commit will keep sending a new 
> commit request during each Consumer#poll, because the offset commit never 
> completes in time. The impact is that the existing consumer will be kicked 
> out of the group after rebalance timeout without joining the group. That is, 
> suppose we have consumer A in group G, and now consumer B joined the group, 
> after the rebalance, only consumer B in the group.
>  
> The workaround for this issue is to change the assignor back to eager 
> assignors, ex: StickyAssignor, RoundRobinAssignor.
>  
> To fix the issue, we come out 2 solutions:
>  # we can explicitly wait for the async commit complete in onPrepareJoin, but 
> that would let the KAFKA-13310 issue happen again.
>  # 2.we can try to keep the async commit offset future currently inflight. So 
> that we can make sure each Consumer#poll, we are waiting for the future 
> completes
>  
> Besides, there's also another bug found during fixing this bug. Before 
> KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry 
> when retriable error until timeout. After KAFKA-13310, we thought we have 
> retry, but we'll retry after partitions revoking. That is, even though the 
> retried offset commit successfully, it still causes some partitions offsets 
> un-committed, and after rebalance, other consumers will consume overlapping 
> records.
>  
>  
> ===
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752]
>  
> we didn't wait for client to receive commit offset response here, so 
> onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and 
> client will loop in invoking onJoinPrepare.
> I think the EAGER mode don't have this problem because it will revoke the 
> partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try 
> to commit next round.
> reproduce:
>  * single node Kafka version 3.2.0 && client version 3.2.0
>  * topic1 have 5 partititons
>  * start a consumer1 (cooperative rebalance)
>  * start another consumer2 (same consumer group)
>  * consumer1 will hang for a long time before re-join
>  * from server log consumer1 rebalance timeout before joineGroup and re-join 
> with another memberId
> consume1's log keeps printing:
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation 
> 54 and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938 
> (ConsumerCoordinator.java:739)
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of 
> offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} 
> (ConsumerCoordinator.java:1143)
>  
> and coordinator's log:
> [2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance 
> group xxx in state PreparingRebalance with old generation 56 
> (__consumer_offsets-30) (reason: Adding new member 
> consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id 
> None; client reason: rebalance failed due to 'The group member needs to have 
> a valid member id before actually entering a consumer group.' 
> (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:43,855] INFO [GroupCoordinator 0]: Group xxx removed 
> dynamic members who haven't joined: 
> Set(consumer-xxx-1-d62a0923-6ca6-48dd-a84e-f97136d4603a) 
> (kafka.coordinator.group.GroupCoordinator)
> [2022-06-26 17:00:43,856] INFO [GroupCoordinator 0]: Stabilized group xxx 
> 

[jira] [Commented] (KAFKA-14024) Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare

2022-07-19 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568731#comment-17568731
 ] 

Guozhang Wang commented on KAFKA-14024:
---

Hello [~mumrah], I took a look at the ticket and also the PR 
(https://github.com/apache/kafka/pull/12349/files) as well, and I agree with 
[~showuon] that this is a pretty bad regression that we should consider fixing 
asap and hence worthy as a blocker for 3.2.1.

As for the PR, personally I'd simplify it a bit than the current fix, to 
`onJoinPrepare` more re-entrant and idempotent: more specifically when the 
caller thread of `poll` enters `onJoinPrepare`, it will check if there's 
already a commit in-flight already and is completed, and if not send out the 
request and return from `onJoinPrepare` immediately, and hence return from the 
`poll` call as well; and the next `poll` call would re-enter `onJoinPrepare` 
and check if the commit request has completed; only if the maintained commit 
future has been completed then would it continue within the function to revoke 
partitions, trigger callbacks etc. In this way we would not need a separate 
timer inside the `onJoinPrepare` for the commit itself. But since [~showuon] is 
almost done reviewing it I think I would leave it to him, rather not block on 
merging it.

In the new rebalance protocol (KIP-848) we would have a much simpler model on 
the client side so hopefully we would not fall in this awkward design pattern 
any more.

> Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare
> --
>
> Key: KAFKA-14024
> URL: https://issues.apache.org/jira/browse/KAFKA-14024
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.2.0
>Reporter: Shawn Wang
>Priority: Blocker
>  Labels: new-consumer-threading-should-fix
> Fix For: 3.3.0, 3.2.1
>
>
> Hi 
> In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue 
> that consumer#poll(duration) will be returned after the provided duration. 
> It's because if rebalance needed, we'll try to commit current offset first 
> before rebalance synchronously. And if the offset committing takes too long, 
> the consumer#poll will spend more time than provided duration. To fix that, 
> we change commit sync with commit async before rebalance (i.e. onPrepareJoin).
>  
> However, in this ticket, we found the async commit will keep sending a new 
> commit request during each Consumer#poll, because the offset commit never 
> completes in time. The impact is that the existing consumer will be kicked 
> out of the group after rebalance timeout without joining the group. That is, 
> suppose we have consumer A in group G, and now consumer B joined the group, 
> after the rebalance, only consumer B in the group.
>  
> The workaround for this issue is to change the assignor back to eager 
> assignors, ex: StickyAssignor, RoundRobinAssignor.
>  
> To fix the issue, we come out 2 solutions:
>  # we can explicitly wait for the async commit complete in onPrepareJoin, but 
> that would let the KAFKA-13310 issue happen again.
>  # 2.we can try to keep the async commit offset future currently inflight. So 
> that we can make sure each Consumer#poll, we are waiting for the future 
> completes
>  
> Besides, there's also another bug found during fixing this bug. Before 
> KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry 
> when retriable error until timeout. After KAFKA-13310, we thought we have 
> retry, but we'll retry after partitions revoking. That is, even though the 
> retried offset commit successfully, it still causes some partitions offsets 
> un-committed, and after rebalance, other consumers will consume overlapping 
> records.
>  
>  
> ===
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752]
>  
> we didn't wait for client to receive commit offset response here, so 
> onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and 
> client will loop in invoking onJoinPrepare.
> I think the EAGER mode don't have this problem because it will revoke the 
> partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try 
> to commit next round.
> reproduce:
>  * single node Kafka version 3.2.0 && client version 3.2.0
>  * topic1 have 5 partititons
>  * start a consumer1 (cooperative rebalance)
>  * start another consumer2 (same consumer group)
>  * consumer1 will hang for a long time before re-join
>  * from server log consumer1 rebalance timeout before joineGroup and re-join 
> with another memberId
> consume1's log keeps printing:
> 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> 

[GitHub] [kafka] jsancio commented on pull request #12274: KAFKA-13959: Controller should unfence Broker with busy metadata log

2022-07-19 Thread GitBox


jsancio commented on PR #12274:
URL: https://github.com/apache/kafka/pull/12274#issuecomment-1189538831

   > @jsancio I think the last question is that there may be more than one 
registration record for each broker after restarting it, so can we rely on the 
broker epoch? I think I need some time to check the logic before make a final 
decision.
   
   Yes. I think you can rely on broker epoch and broker id. Also the active 
controller is guaranteed to have read all of the records on the log before 
handling RPCs like heartbeat.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #12324: KAFKA-12699: Override the default handler for stream threads if the stream's handler is used

2022-07-19 Thread GitBox


ableegoldman commented on PR #12324:
URL: https://github.com/apache/kafka/pull/12324#issuecomment-1189528697

   Also cherrypicked back to the 3.3 branch still code freeze deadline is 
tomorrow


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #12324: KAFKA-12699: Override the default handler for stream threads if the stream's handler is used

2022-07-19 Thread GitBox


ableegoldman commented on PR #12324:
URL: https://github.com/apache/kafka/pull/12324#issuecomment-1189526095

   Merged to trunk, thanks @wcarlson5 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman merged pull request #12324: KAFKA-12699: Override the default handler for stream threads if the stream's handler is used

2022-07-19 Thread GitBox


ableegoldman merged PR #12324:
URL: https://github.com/apache/kafka/pull/12324


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #12324: KAFKA-12699: override the default handler for stream threads if the stream's handl…

2022-07-19 Thread GitBox


ableegoldman commented on PR #12324:
URL: https://github.com/apache/kafka/pull/12324#issuecomment-1189524621

   Test failures are unrelated, ARM build did time out but I've seen this 
happen on other PRs recently and the build page shows the actual test steps 
passing (error message in failed step is just ` [Checks API] No suitable checks 
publisher found`) so I believe it to be unrelated as well. Seems good to merge


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

2022-07-19 Thread GitBox


badaiaqrandista commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r924908020


##
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##
@@ -1878,6 +1878,97 @@ class SocketServerTest {
 }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the 
following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into 
SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer 
has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket 
and put it in the buffer
+   *  (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in 
SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in 
SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This 
request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, 
"select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" 
method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+shutdownServerAndMetrics(server)
+
+// to ensure we only have 1 connection (channel)
+val props = sslServerProps
+val numConnections = 1
+props.put("max.connections.per.ip", numConnections.toString)
+
+// create server with SSL listener
+val testableServer = new TestableSocketServer(KafkaConfig.fromProps(props))
+testableServer.enableRequestProcessing(Map.empty)
+//   
dataPlaneAcceptors.get(endpoint).processors(0).selector.asInstanceOf[TestableSelector]
+val testableSelector = testableServer.testableSelector
+val proxyServer = new ProxyServer(testableServer)
+
+try {
+  // trigger SSL handshake by sending the first request and receiving its 
response without buffering
+  val requestBytes = producerRequestBytes()
+  val sslSocket = sslClientSocket(proxyServer.localPort)
+
+  sendRequest(sslSocket, requestBytes)
+  val request1 = receiveRequest(testableServer.dataPlaneRequestChannel)
+  processRequest(testableServer.dataPlaneRequestChannel, request1)
+  receiveResponse(sslSocket)
+
+  // then put 2 requests in SslTransportLayer.netReadBuffer via the 
ProxyServer
+  val connectionId = request1.context.connectionId
+  val listener = 
testableServer.config.dataPlaneListeners.head.listenerName.value
+  val channel = 
testableServer.dataPlaneAcceptor(listener).get.processors(0).channel(connectionId).getOrElse(throw
 new IllegalStateException("Channel not found"))
+  val transportLayer: SslTransportLayer = JTestUtils.fieldValue(channel, 
classOf[KafkaChannel], "transportLayer")
+  val netReadBuffer: ByteBuffer = JTestUtils.fieldValue(transportLayer, 
classOf[SslTransportLayer], "netReadBuffer")
+
+  proxyServer.enableBuffering(netReadBuffer)
+  sendRequest(sslSocket, requestBytes)
+  sendRequest(sslSocket, requestBytes)
+
+  val keysWithBufferedRead: util.Set[SelectionKey] = 
JTestUtils.fieldValue(testableSelector, classOf[Selector], 
"keysWithBufferedRead")
+  keysWithBufferedRead.add(channel.selectionKey)
+  JTestUtils.setFieldValue(transportLayer, "hasBytesBuffered", true)
+
+  // process the first request in the server side
+  // this would move bytes from netReadBuffer to appReadBuffer, then 
process only the first request
+  processRequest(testableServer.dataPlaneRequestChannel)
+
+  // receive response in the client side
+  receiveResponse(sslSocket)
+
+  // process the second request in the server side
+  // this would process the second request in the appReadBuffer
+  // NOTE: this should not block because the data is already in the 
buffer, but without the fix for KAFKA-13559,
+  // this step will take more than 300 ms
+  val processTimeStart = System.currentTimeMillis()
+  processRequest(testableServer.dataPlaneRequestChannel)
+  val processTimeEnd = System.currentTimeMillis()
+
+  // receive response in the client side
+  receiveResponse(sslSocket)

Review Comment:
   @splett2 Can you refresh the PR? I have added timeout override and changed 
currentTimeMillis to nanoTime. So this comment is against an older version of 
the PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To 

[jira] [Assigned] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2022-07-19 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-12463:
-

Assignee: (was: Chris Egerton)

> Update default consumer partition assignor for sink tasks
> -
>
> Key: KAFKA-12463
> URL: https://issues.apache.org/jira/browse/KAFKA-12463
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Priority: Major
>  Labels: needs-kip
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].
> If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
> is used by default. Although there are some benefits to this assignor 
> including stability of assignment across generations and simplicity of 
> design, it comes with a major drawback: the number of active consumers in a 
> group is limited to the number of partitions in the topic(s) with the most 
> partitions. For an example of the worst case, in a consumer group where every 
> member is subscribed to ten topics that each have one partition, only one 
> member of that group will be assigned any topic partitions.
> This can end up producing counterintuitive and even frustrating behavior when 
> a sink connector is brought up with N tasks to read from some collection of 
> topics with a total of N topic partitions, but some tasks end up idling and 
> not processing any data.
> h3. Proposed Change
> *NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below 
> will not work as consumers will still perform eager rebalancing as long as at 
> least one of the partition assignors they are configured with does not 
> support cooperative rebalancing. KAFKA-12487 should also be addressed before 
> configuring any Connect worker to use the {{CooperativeStickyAssignor}} for 
> any sink connectors.*
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  introduced the {{CooperativeStickyAssignor}}, which seeks to provide a 
> stable assignment across generations wherever possible, provide the most even 
> assignment possible (taking into account possible differences in 
> subscriptions across consumers in the group), and allow consumers to continue 
> processing data during rebalance. The documentation for the assignor states 
> that "Users should prefer this assignor for newer clusters." As Connect and 
> the tooling around it matures and automatic restarts of failed tasks become 
> more popular, care should be taken to ensure that the consumer group churn 
> created by restarting one or more tasks doesn't compromise the availability 
> of other tasks by forcing them to temporarily yield up all of their 
> partitions just to reclaim them after a rebalance has completed.
> With that in mind, we should alter the default consumer configuration for 
> sink tasks to use the new {{CooperativeStickyAssignor}}. In order to do this 
> in a backwards-compatible fashion that also enables rolling upgrades, this 
> should be implemented by changing the {{Worker}} to set the following on the 
> consumer configuration created for each sink connector task:
> {code:java}
> partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
> {code}
> This way, consumer groups for sink connectors on Connect clusters in the 
> process of being upgraded will continue to use the {{RangeAssignor}} until 
> all workers in the cluster have been upgraded, and then will switch over to 
> the new {{CooperativeStickyAssignor}} automatically.
> Importantly, this setting will only be a default, and any user-specified 
> overrides either in the *worker config*:
>  
> {code:java}
> consumer.partition.assignment.strategy={code}
>  
> or in the *connector config*:
>  
> {code:java}
> "consumer.override.partition.assignment.strategy": " strategy>"{code}
>  
> will still be respected.
> This improvement is viable as far back as -2.3- 2.4, when the 
> {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
> fix, 

[jira] [Assigned] (KAFKA-12476) Worker can block for longer than scheduled rebalance delay and/or session key TTL

2022-07-19 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-12476:
-

Assignee: (was: Chris Egerton)

> Worker can block for longer than scheduled rebalance delay and/or session key 
> TTL
> -
>
> Key: KAFKA-12476
> URL: https://issues.apache.org/jira/browse/KAFKA-12476
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.2, 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2, 3.0.0
>Reporter: Chris Egerton
>Priority: Major
>
> Near the end of a distributed worker's herder tick loop, it calculates how 
> long it should poll for rebalance activity before beginning a new loop. See 
> [here|https://github.com/apache/kafka/blob/8da65936d7fc53d24c665c0d01893d25a430933b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L399-L409]
>  and 
> [here|https://github.com/apache/kafka/blob/8da65936d7fc53d24c665c0d01893d25a430933b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L459].
> In between then and when it begins polling for rebalancing activity, some 
> connector and task (re-)starts take place. While this normally completes in 
> at most a minute or two, an overloaded cluster or one in the midst of garbage 
> collection may take longer. See 
> [here|https://github.com/apache/kafka/blob/8da65936d7fc53d24c665c0d01893d25a430933b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L411-L452].
> The worker should calculate the time to poll for rebalance activity as 
> closely as possible to when it actually begins that polling.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-8299) Add type-safe instantiation of generic classes to AbstractConfig

2022-07-19 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-8299:


Assignee: (was: Chris Egerton)

> Add type-safe instantiation of generic classes to AbstractConfig
> 
>
> Key: KAFKA-8299
> URL: https://issues.apache.org/jira/browse/KAFKA-8299
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Reporter: Chris Egerton
>Priority: Minor
>
> {{AbstractConfig.getConfiguredInstance(String key, Class klass)}} and 
> other similar methods isn't type-safe for generic types. For example, the 
> following code compiles but generates a runtime exception when the created 
> {{Consumer}} is invoked:
>  
> {code:java}
> public class KafkaIssueSnippet {
> public static class PrintInt implements Consumer {
>     @Override
> public void accept(Integer i) {
> System.out.println(i);
> }
> }
> public static void main(String[] args) {
> final String stringConsumerProp = "string.consumer.class";
> AbstractConfig config = new AbstractConfig(
> new ConfigDef().define(
> stringConsumerProp,
> ConfigDef.Type.CLASS,
> ConfigDef.Importance.HIGH,
> "A class that implements Consumer"
> ),
> Collections.singletonMap(
> stringConsumerProp,
> PrintInt.class.getName()
> )
> );
> Consumer stringConsumer = config.getConfiguredInstance(
> stringConsumerProp,
> Consumer.class
> );
> stringConsumer.accept("Oops! ClassCastException");
> }
> }{code}
> The compiler (rightfully so) generates a warning about the unchecked cast 
> from {{Consumer}} to {{Consumer}} to indicate that exactly this sort 
> of thing may happen, but it would be nice if we didn't have to worry about 
> this in the first place and instead had the same guarantees for generic types 
> that we do for non-generic types: that either the 
> {{getConfiguredInstance(...)}} method returns an object to us that we know 
> for sure is an instance of the requested type, or an exception is thrown.
> Apache Commons contains a useful reflection library that could possibly be 
> used to bridge this gap; specifically, its 
> [TypeUtils|https://commons.apache.org/proper/commons-lang/apidocs/org/apache/commons/lang3/reflect/TypeUtils.html]
>  and 
> [TypeLiteral|https://commons.apache.org/proper/commons-lang/apidocs/org/apache/commons/lang3/reflect/TypeLiteral.html]
>  classes could be used to add new {{getConfiguredInstance}} and 
> {{getConfiguredInstances}} methods to the {{AbstractConfig}} class that 
> accept instances of {{TypeLiteral}} instead of {{Class}} and then perform 
> type checking to ensure that the requested class actually implements/extends 
> from the requested type.
> Since this affects public API it's possible a KIP will be required, but the 
> changes are pretty lightweight (four new methods that heavily resemble 
> existing ones). If a contributor or committer, especially one familiar with 
> this section of the codebase, has an opinion on the necessity of a KIP their 
> input would be appreciated.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on a diff in pull request #12422: KAFKA-13982: Move WorkerConfigTransformerTest to use Mockito

2022-07-19 Thread GitBox


C0urante commented on code in PR #12422:
URL: https://github.com/apache/kafka/pull/12422#discussion_r924890270


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java:
##
@@ -144,10 +149,6 @@ public void subscribe(String path, Set keys, 
ConfigChangeCallback callba
 throw new UnsupportedOperationException();
 }
 
-public void unsubscribe(String path, Set keys) {
-throw new UnsupportedOperationException();
-}
-

Review Comment:
   Nice catch, took me a second to figure out the misalignment between the type 
signatures for this method and for `ConfigProvider::unsubscribe`.
   
   Should we also remove the `subscribe` method, since it matches the [default 
implementation](https://github.com/apache/kafka/blob/693e283802590b724ef441d5bf7acb6eeced91c5/clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java#L59-L61)
 for `ConfigProvider::subscribe`?
   
   Also, should we annotate any remaining methods with `@Override` to help 
prevent future mistakes like the one you've caught here?



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java:
##
@@ -53,64 +50,72 @@ public class WorkerConfigTransformerTest {
 public static final String TEST_RESULT_WITH_TTL = "testResultWithTTL";
 public static final String TEST_RESULT_WITH_LONGER_TTL = 
"testResultWithLongerTTL";
 
-@Mock private Herder herder;
-@Mock private Worker worker;
-@Mock private HerderRequest requestId;
+private final Herder herder = Mockito.mock(Herder.class);
+private final Worker worker = Mockito.mock(Worker.class);
+private final HerderRequest requestId = Mockito.mock(HerderRequest.class);
 private WorkerConfigTransformer configTransformer;
 
 @Before
 public void setup() {
-worker = PowerMock.createMock(Worker.class);
-herder = PowerMock.createMock(Herder.class);
 configTransformer = new WorkerConfigTransformer(worker, 
Collections.singletonMap("test", new TestConfigProvider()));
 }
 
 @Test
 public void testReplaceVariable() {
+// Execution
 Map result = configTransformer.transform(MY_CONNECTOR, 
Collections.singletonMap(MY_KEY, "${test:testPath:testKey}"));
+
+// Assertions
 assertEquals(TEST_RESULT, result.get(MY_KEY));
 }
 
 @Test
 public void testReplaceVariableWithTTL() {
-EasyMock.expect(worker.herder()).andReturn(herder);
-
-replayAll();
+// Setup
+when(worker.herder()).thenReturn(herder);
 
+// Execution
 Map props = new HashMap<>();
 props.put(MY_KEY, "${test:testPath:testKeyWithTTL}");
 props.put(CONFIG_RELOAD_ACTION_CONFIG, CONFIG_RELOAD_ACTION_NONE);
 Map result = configTransformer.transform(MY_CONNECTOR, 
props);
+
+// Assertions
+assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY));

Review Comment:
   Good catch 



##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java:
##
@@ -53,64 +50,72 @@ public class WorkerConfigTransformerTest {
 public static final String TEST_RESULT_WITH_TTL = "testResultWithTTL";
 public static final String TEST_RESULT_WITH_LONGER_TTL = 
"testResultWithLongerTTL";
 
-@Mock private Herder herder;
-@Mock private Worker worker;
-@Mock private HerderRequest requestId;
+private final Herder herder = Mockito.mock(Herder.class);
+private final Worker worker = Mockito.mock(Worker.class);
+private final HerderRequest requestId = Mockito.mock(HerderRequest.class);
 private WorkerConfigTransformer configTransformer;
 
 @Before
 public void setup() {
-worker = PowerMock.createMock(Worker.class);
-herder = PowerMock.createMock(Herder.class);
 configTransformer = new WorkerConfigTransformer(worker, 
Collections.singletonMap("test", new TestConfigProvider()));
 }
 
 @Test
 public void testReplaceVariable() {
+// Execution
 Map result = configTransformer.transform(MY_CONNECTOR, 
Collections.singletonMap(MY_KEY, "${test:testPath:testKey}"));
+
+// Assertions
 assertEquals(TEST_RESULT, result.get(MY_KEY));
 }
 
 @Test
 public void testReplaceVariableWithTTL() {
-EasyMock.expect(worker.herder()).andReturn(herder);
-
-replayAll();
+// Setup
+when(worker.herder()).thenReturn(herder);
 
+// Execution
 Map props = new HashMap<>();
 props.put(MY_KEY, "${test:testPath:testKeyWithTTL}");
 props.put(CONFIG_RELOAD_ACTION_CONFIG, CONFIG_RELOAD_ACTION_NONE);
 Map result = configTransformer.transform(MY_CONNECTOR, 
props);
+
+// Assertions
+assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY));
 }
 
 @Test
 public void 

[GitHub] [kafka] C0urante commented on pull request #11783: KAFKA-10000: System tests (KIP-618)

2022-07-19 Thread GitBox


C0urante commented on PR #11783:
URL: https://github.com/apache/kafka/pull/11783#issuecomment-1189451124

   I made a small adjustment to the shutdown logic for the tests, which ensures 
that the worker process is actually dead before trying to bring up a new one. 
This should not affect the accuracy of the tests, but it did solve the 
above-mentioned issue with workers failing to restart because port 8083 on 
their container was still in use.
   
   I also corrected a small rebase error caused by the recent changes to remove 
the file connectors from the default Connect setup, a test failure caused by 
the new source connector properties, and a minor bug in the 
`VerifiableSourceTask` class that can cause a `NullPointerException` during 
shutdown.
   
   With these changes, I was able to get a complete green run of the set of 
tests under the `tests/kafkatest/tests/connect` directory.
   
   I've also kicked off a local run of the `test_exactly_once_source` test with 
unclean shutdown and the `sessioned` protocol; going to try to let that go for 
the rest of the day on repeat. Will report any non-spurious failures.
   
   @showuon if you have time, would you mind giving a local run of these tests 
another try?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

2022-07-19 Thread GitBox


mjsax commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r924838158


##
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest { //extends 
AbstractResetIntegrationTest {
+@Rule
+public Timeout globalTimeout = Timeout.seconds(600);
+@Rule
+public final TestName testName = new TestName();
+private static MockTime mockTime;
+
+@Rule
+public final TemporaryFolder testFolder = new 
TemporaryFolder(TestUtils.tempDirectory());
+
+protected static final String INPUT_TOPIC = "inputTopic";
+protected static final String OUTPUT_TOPIC = "outputTopic";
+private static final String OUTPUT_TOPIC_2 = "outputTopic2";
+private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
+
+protected static final int STREAMS_CONSUMER_TIMEOUT = 2000;
+protected static final int TIMEOUT_MULTIPLIER = 15;
+
+protected Properties streamsConfig;
+protected static KafkaStreams streams;
+protected static Admin adminClient;
+protected Properties commonClientConfig;
+private Properties producerConfig;
+protected Properties resultConsumerConfig;
+
+public static final EmbeddedKafkaCluster CLUSTER;
+
+static {
+final Properties brokerProps = new Properties();
+CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+}
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@Before
+public void before() throws Exception {
+mockTime = CLUSTER.time;
+
+final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+commonClientConfig = new Properties();
+commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+
+streamsConfig = new Properties();
+streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, 
testFolder.getRoot().getPath());
+streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
+streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 

[GitHub] [kafka] junrao commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

2022-07-19 Thread GitBox


junrao commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r924818200


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -1465,13 +1465,21 @@ public boolean isDone() {
 private class AppendCallbacks implements 
RecordAccumulator.AppendCallbacks {
 private final Callback userCallback;
 private final ProducerInterceptors interceptors;
-private final ProducerRecord record;
-protected int partition = RecordMetadata.UNKNOWN_PARTITION;
+private final String topic;
+private final Integer recordPartition;
+private final String recordLogString;
+private volatile int partition = RecordMetadata.UNKNOWN_PARTITION;
+private volatile TopicPartition topicPartition;
 
 private AppendCallbacks(Callback userCallback, ProducerInterceptors interceptors, ProducerRecord record) {
 this.userCallback = userCallback;
 this.interceptors = interceptors;
-this.record = record;
+// Extract record info as we don't want to keep a reference to the 
record during
+// whole lifetime of the batch.

Review Comment:
   Thanks for the explanation. Sounds good.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ashmeet13 commented on pull request #12414: KAFKA-14073 Logging the reason for Snapshot

2022-07-19 Thread GitBox


ashmeet13 commented on PR #12414:
URL: https://github.com/apache/kafka/pull/12414#issuecomment-1189398064

   @dengziming I have made the changes requested. I have also made the changes 
in `QuorumController.maybeGenerateSnapshot` to log the reason for the snapshot 
being generated. 
   
   `maybeGenerateSnapshot` seemed to be a better place than 
`QuorumController.SnapshotGeneratorManager.createSnapshotGenerator` since it 
already had logs for starting a snapshot.
   
   Also, it seems like `QuorumController` only creates a snapshot when max 
bytes are exceeded, I couldn't find it calling the snapshot generator for the 
reason metadata version changed. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ashmeet13 commented on a diff in pull request #12414: KAFKA-14073 Logging the reason for Snapshot

2022-07-19 Thread GitBox


ashmeet13 commented on code in PR #12414:
URL: https://github.com/apache/kafka/pull/12414#discussion_r924807677


##
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##
@@ -89,6 +89,12 @@ class BrokerMetadataListener(
*/
   private var _bytesSinceLastSnapshot: Long = 0L
 
+  /**
+   * The reason as to why we are calling maybeStartSnapshot, can be either
+   * MaxBytesExceeded or MetadataVersionChanged
+   */
+  private var _reasonForSnapshot: String = ""

Review Comment:
   Thanks, I have made the change. I have handled it by returning a `Tuple` 
from the `shouldSnapshot` function.
   I am new to Scala, if there is another recommended way of doing it please 
let me know!



##
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##
@@ -128,7 +134,15 @@ class BrokerMetadataListener(
   }
 
   private def shouldSnapshot(): Boolean = {
-(_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots) || 
metadataVersionChanged()
+if (_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots) {
+  _reasonForSnapshot = "MaxBytesExceeded"

Review Comment:
   Got it, made this change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #12391: [DO NOT MERGE] KAFKA-10199: Add task updater metrics

2022-07-19 Thread GitBox


guozhangwang commented on PR #12391:
URL: https://github.com/apache/kafka/pull/12391#issuecomment-1189383814

   @cadonna please lmk wdyt about the proposed metric changes in the 
descriptions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] artemlivshits commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

2022-07-19 Thread GitBox


artemlivshits commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r924801253


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -1465,13 +1465,21 @@ public boolean isDone() {
 private class AppendCallbacks implements 
RecordAccumulator.AppendCallbacks {
 private final Callback userCallback;
 private final ProducerInterceptors interceptors;
-private final ProducerRecord record;
-protected int partition = RecordMetadata.UNKNOWN_PARTITION;
+private final String topic;
+private final Integer recordPartition;
+private final String recordLogString;
+private volatile int partition = RecordMetadata.UNKNOWN_PARTITION;
+private volatile TopicPartition topicPartition;
 
 private AppendCallbacks(Callback userCallback, ProducerInterceptors interceptors, ProducerRecord record) {
 this.userCallback = userCallback;
 this.interceptors = interceptors;
-this.record = record;
+// Extract record info as we don't want to keep a reference to the 
record during
+// whole lifetime of the batch.

Review Comment:
   I think it applies to all 3 fields: topic, recordPartition and 
recordLogString - we extract all this info from the record, so the comment is 
before we do that (in the PR it's kind of hard to see because of the inline 
discussion).  Let me know if you think otherwise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #12421: Revert "KAFKA-12887 Skip some RuntimeExceptions from exception handle…

2022-07-19 Thread GitBox


mjsax commented on PR #12421:
URL: https://github.com/apache/kafka/pull/12421#issuecomment-1189369026

   Thanks for the PR.
   
   Merged to `trunk` and cherry-picked to `3.3`, `3.2` branches. (Did not 
cherry-pick cleanly to `3.1` -- can you do a follow up PR to backport it?)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-12887) Do not trigger user-customized ExceptionalHandler for RTE

2022-07-19 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568659#comment-17568659
 ] 

Matthias J. Sax commented on KAFKA-12887:
-

This feature broke some stuff, and we revert it: 
[https://github.com/apache/kafka/pull/12421] 

Reverted in 3.3.0, 3.2.1.

> Do not trigger user-customized ExceptionalHandler for RTE
> -
>
> Key: KAFKA-12887
> URL: https://issues.apache.org/jira/browse/KAFKA-12887
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Josep Prat
>Priority: Major
> Fix For: 3.1.0
>
>
> Today in StreamThread we have a try-catch block that captures all {{Throwable 
> e}} and then triggers {{this.streamsUncaughtExceptionHandler.accept(e)}}. 
> However, there are possible RTEs such as IllegalState/IllegalArgument 
> exceptions which are usually caused by bugs, etc. In such cases we should not 
> let users to decide what to do with these exceptions, but should let Streams 
> itself to enforce the decision, e.g. in the IllegalState/IllegalArgument we 
> should fail fast to notify the potential error.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions

2022-07-19 Thread GitBox


jnh5y commented on code in PR #12408:
URL: https://github.com/apache/kafka/pull/12408#discussion_r924785404


##
streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java:
##
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KafkaStreams.CloseOptions;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+
+import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup;
+
+@Category({IntegrationTest.class})
+public class KafkaStreamsCloseOptionsIntegrationTest { //extends 
AbstractResetIntegrationTest {
+@Rule
+public Timeout globalTimeout = Timeout.seconds(600);
+@Rule
+public final TestName testName = new TestName();
+private static MockTime mockTime;
+
+@Rule
+public final TemporaryFolder testFolder = new 
TemporaryFolder(TestUtils.tempDirectory());
+
+protected static final String INPUT_TOPIC = "inputTopic";
+protected static final String OUTPUT_TOPIC = "outputTopic";
+private static final String OUTPUT_TOPIC_2 = "outputTopic2";
+private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun";
+
+protected static final int STREAMS_CONSUMER_TIMEOUT = 2000;
+protected static final int TIMEOUT_MULTIPLIER = 15;
+
+protected Properties streamsConfig;
+protected static KafkaStreams streams;
+protected static Admin adminClient;
+protected Properties commonClientConfig;
+private Properties producerConfig;
+protected Properties resultConsumerConfig;
+
+public static final EmbeddedKafkaCluster CLUSTER;
+
+static {
+final Properties brokerProps = new Properties();
+CLUSTER = new EmbeddedKafkaCluster(1, brokerProps);
+}
+
+@BeforeClass
+public static void startCluster() throws IOException {
+CLUSTER.start();
+}
+
+@AfterClass
+public static void closeCluster() {
+CLUSTER.stop();
+}
+
+@Before
+public void before() throws Exception {
+mockTime = CLUSTER.time;
+
+final String appID = 
IntegrationTestUtils.safeUniqueTestName(getClass(), testName);
+
+commonClientConfig = new Properties();
+commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+
+streamsConfig = new Properties();
+streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, 
testFolder.getRoot().getPath());
+streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Long().getClass());
+streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 

[GitHub] [kafka] junrao commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

2022-07-19 Thread GitBox


junrao commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r924779033


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -1465,13 +1465,21 @@ public boolean isDone() {
 private class AppendCallbacks implements 
RecordAccumulator.AppendCallbacks {
 private final Callback userCallback;
 private final ProducerInterceptors interceptors;
-private final ProducerRecord record;
-protected int partition = RecordMetadata.UNKNOWN_PARTITION;
+private final String topic;
+private final Integer recordPartition;
+private final String recordLogString;
+private volatile int partition = RecordMetadata.UNKNOWN_PARTITION;
+private volatile TopicPartition topicPartition;
 
 private AppendCallbacks(Callback userCallback, ProducerInterceptors interceptors, ProducerRecord record) {
 this.userCallback = userCallback;
 this.interceptors = interceptors;
-this.record = record;
+// Extract record info as we don't want to keep a reference to the 
record during
+// whole lifetime of the batch.

Review Comment:
   Could we move these two lines to the immediate line before where we set 
recordPartition?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #12365: KAFKA-14020: Performance regression in Producer

2022-07-19 Thread GitBox


jsancio commented on PR #12365:
URL: https://github.com/apache/kafka/pull/12365#issuecomment-1189361061

   > @jsancio : We plan to cherry-pick this to 3.3 branch since this fixes a 
performance issue in 
[KAFKA-10888](https://issues.apache.org/jira/browse/KAFKA-10888).
   
   Sounds good @junrao . I set the fix version for KAFKA-14020 to 3.3.0.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14020) Performance regression in Producer

2022-07-19 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-14020:
---
Fix Version/s: 3.3.0

> Performance regression in Producer
> --
>
> Key: KAFKA-14020
> URL: https://issues.apache.org/jira/browse/KAFKA-14020
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.3.0
>Reporter: John Roesler
>Assignee: Artem Livshits
>Priority: Blocker
> Fix For: 3.3.0
>
>
> [https://github.com/apache/kafka/commit/f7db6031b84a136ad0e257df722b20faa7c37b8a]
>  introduced a 10% performance regression in the KafkaProducer under a default 
> config.
>  
> The context for this result is a benchmark that we run for Kafka Streams. The 
> benchmark provisions 5 independent AWS clusters, including one broker node on 
> an i3.large and one client node on an i3.large. During a benchmark run, we 
> first run the Producer for 10 minutes to generate test data, and then we run 
> Kafka Streams under a number of configurations to measure its performance.
> Our observation was a 10% regression in throughput under the simplest 
> configuration, in which Streams simply consumes from a topic and does nothing 
> else. That benchmark actually runs faster than the producer that generates 
> the test data, so its thoughput is bounded by the data generator's 
> throughput. After investigation, we realized that the regression was in the 
> data generator, not the consumer or Streams.
> We have numerous benchmark runs leading up to the commit in question, and 
> they all show a throughput in the neighborhood of 115,000 records per second. 
> We also have 40 runs including and after that commit, and they all show a 
> throughput in the neighborhood of 105,000 records per second. A test on 
> [trunk with the commit reverted |https://github.com/apache/kafka/pull/12342] 
> shows a return to around 115,000 records per second.
> Config:
> {code:java}
> final Properties properties = new Properties();
> properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
> properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class);
> properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
> StringSerializer.class);
> {code}
> Here's the producer code in the data generator. Our tests were running with 
> three produceThreads.
> {code:java}
>  for (int t = 0; t < produceThreads; t++) {
> futures.add(executorService.submit(() -> {
> int threadTotal = 0;
> long lastPrint = start;
> final long printInterval = Duration.ofSeconds(10).toMillis();
> long now;
> try (final org.apache.kafka.clients.producer.Producer 
> producer = new KafkaProducer<>(producerConfig(broker))) {
> while (limit > (now = System.currentTimeMillis()) - start) {
> for (int i = 0; i < 1000; i++) {
> final String key = keys.next();
> final String data = dataGen.generate();
> producer.send(new ProducerRecord<>(topic, key, 
> valueBuilder.apply(key, data)));
> threadTotal++;
> }
> if ((now - lastPrint) > printInterval) {
> System.out.println(Thread.currentThread().getName() + " 
> produced " + numberFormat.format(threadTotal) + " to " + topic + " in " + 
> Duration.ofMillis(now - start));
> lastPrint = now;
> }
> }
> }
> total.addAndGet(threadTotal);
> System.out.println(Thread.currentThread().getName() + " finished (" + 
> numberFormat.format(threadTotal) + ") in " + Duration.ofMillis(now - start));
> }));
> }{code}
> As you can see, this is a very basic usage.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14087) Add jmh benchmark for producer with MockClient

2022-07-19 Thread Artem Livshits (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Artem Livshits updated KAFKA-14087:
---
Description: 
Something like this
{code:java}
        Map configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
        configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
        Time time = Time.SYSTEM;
        AtomicInteger offset = new AtomicInteger(0);
        MetadataResponse initialUpdateResponse = 
RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 2));
        ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);
        StringBuilder value = new StringBuilder("foo");
        for (int i = 0; i < 1000; i++)
            value.append("x");
        AtomicInteger totalRecords = new AtomicInteger(0);
        long start = time.milliseconds();
        CompletableFuture[] futures = new CompletableFuture[3];
        for (int i = 0; i < futures.length; i++) {
            futures[i] = CompletableFuture.runAsync(() -> {
                ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
                MockClient client = new MockClient(time, metadata) {
                    @Override
                    public void send(ClientRequest request, long now) {
                        super.send(request, now);
                        if (request.apiKey() == ApiKeys.PRODUCE) {
                            // Prepare response data from request.
                            ProduceResponseData responseData = new 
ProduceResponseData();                            ProduceRequest produceRequest 
= (ProduceRequest) request.requestBuilder().build();
                            produceRequest.data().topicData().forEach(topicData 
->
                                    
topicData.partitionData().forEach(partitionData -> {
                                        String topic = topicData.name();
                                        
ProduceResponseData.TopicProduceResponse tpr = 
responseData.responses().find(topic);
                                        if (tpr == null) {
                                            tpr = new 
ProduceResponseData.TopicProduceResponse().setName(topic);
                                            responseData.responses().add(tpr);
                                        }
                                        tpr.partitionResponses().add(new 
ProduceResponseData.PartitionProduceResponse()
                                                .setIndex(partitionData.index())
                                                
.setRecordErrors(Collections.emptyList())
                                                
.setBaseOffset(offset.addAndGet(1))
                                                
.setLogAppendTimeMs(time.milliseconds())
                                                .setLogStartOffset(0)
                                                .setErrorMessage("")
                                                
.setErrorCode(Errors.NONE.code()));
                                    }));                            // Schedule 
a reply to come after some time to mock broker latency.
                            executorService.schedule(() -> respond(new 
ProduceResponse(responseData)), 20, TimeUnit.MILLISECONDS);
                        }
                    }
                };                client.updateMetadata(initialUpdateResponse); 
               InitProducerIdResponseData responseData = new 
InitProducerIdResponseData()
                        .setErrorCode(Errors.NONE.code())
                        .setProducerEpoch((short) 0)
                        .setProducerId(42)
                        .setThrottleTimeMs(0);
                client.prepareResponse(body -> body instanceof 
InitProducerIdRequest,
                        new InitProducerIdResponse(responseData), false);       
         try (KafkaProducer producer = kafkaProducer(
                        configs,
                        new StringSerializer(),
                        new StringSerializer(),
                        metadata,
                        client,
                        null,
                        time
                )) {
                    final int records = 20_000_000;                    for (int 
k = 0; k < records; k++) {
                        producer.send(new ProducerRecord<>("topic", null, 
start, "key-" + k, value.toString()));
                    }                    totalRecords.addAndGet(records);
                }
            });
        }        for (CompletableFuture future : futures) {
            future.get();
        } {code}
 

  was:
Something like this
{code:java}
        Map configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
        configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");        
Time 

[jira] [Created] (KAFKA-14087) Add jmh benchmark for producer with MockClient

2022-07-19 Thread Artem Livshits (Jira)
Artem Livshits created KAFKA-14087:
--

 Summary: Add jmh benchmark for producer with MockClient
 Key: KAFKA-14087
 URL: https://issues.apache.org/jira/browse/KAFKA-14087
 Project: Kafka
  Issue Type: Improvement
  Components: producer 
Reporter: Artem Livshits


Something like this
{code:java}
        Map configs = new HashMap<>();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
        configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");        
Time time = Time.SYSTEM;
        AtomicInteger offset = new AtomicInteger(0);        MetadataResponse 
initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, 
singletonMap("topic", 2));
        ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE);        
StringBuilder value = new StringBuilder("foo");
        for (int i = 0; i < 1000; i++)
            value.append("x");        AtomicInteger totalRecords = new 
AtomicInteger(0);
        long start = time.milliseconds();        CompletableFuture[] futures = 
new CompletableFuture[3];
        for (int i = 0; i < futures.length; i++) {
            futures[i] = CompletableFuture.runAsync(() -> {
                ScheduledExecutorService executorService = 
Executors.newSingleThreadScheduledExecutor();
                MockClient client = new MockClient(time, metadata) {
                    @Override
                    public void send(ClientRequest request, long now) {
                        super.send(request, now);
                        if (request.apiKey() == ApiKeys.PRODUCE) {
                            // Prepare response data from request.
                            ProduceResponseData responseData = new 
ProduceResponseData();                            ProduceRequest produceRequest 
= (ProduceRequest) request.requestBuilder().build();
                            produceRequest.data().topicData().forEach(topicData 
->
                                    
topicData.partitionData().forEach(partitionData -> {
                                        String topic = topicData.name();
                                        
ProduceResponseData.TopicProduceResponse tpr = 
responseData.responses().find(topic);
                                        if (tpr == null) {
                                            tpr = new 
ProduceResponseData.TopicProduceResponse().setName(topic);
                                            responseData.responses().add(tpr);
                                        }
                                        tpr.partitionResponses().add(new 
ProduceResponseData.PartitionProduceResponse()
                                                .setIndex(partitionData.index())
                                                
.setRecordErrors(Collections.emptyList())
                                                
.setBaseOffset(offset.addAndGet(1))
                                                
.setLogAppendTimeMs(time.milliseconds())
                                                .setLogStartOffset(0)
                                                .setErrorMessage("")
                                                
.setErrorCode(Errors.NONE.code()));
                                    }));                            // Schedule 
a reply to come after some time to mock broker latency.
                            executorService.schedule(() -> respond(new 
ProduceResponse(responseData)), 20, TimeUnit.MILLISECONDS);
                        }
                    }
                };                client.updateMetadata(initialUpdateResponse); 
               InitProducerIdResponseData responseData = new 
InitProducerIdResponseData()
                        .setErrorCode(Errors.NONE.code())
                        .setProducerEpoch((short) 0)
                        .setProducerId(42)
                        .setThrottleTimeMs(0);
                client.prepareResponse(body -> body instanceof 
InitProducerIdRequest,
                        new InitProducerIdResponse(responseData), false);       
         try (KafkaProducer producer = kafkaProducer(
                        configs,
                        new StringSerializer(),
                        new StringSerializer(),
                        metadata,
                        client,
                        null,
                        time
                )) {
                    final int records = 20_000_000;                    for (int 
k = 0; k < records; k++) {
                        producer.send(new ProducerRecord<>("topic", null, 
start, "key-" + k, value.toString()));
                    }                    totalRecords.addAndGet(records);
                }
            });
        }        for (CompletableFuture future : futures) {
            future.get();
        } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] artemlivshits commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

2022-07-19 Thread GitBox


artemlivshits commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r924715457


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -1465,13 +1465,17 @@ public boolean isDone() {
 private class AppendCallbacks implements 
RecordAccumulator.AppendCallbacks {
 private final Callback userCallback;
 private final ProducerInterceptors interceptors;
-private final ProducerRecord record;
+private ProducerRecord record;
+private final String topic;
 protected int partition = RecordMetadata.UNKNOWN_PARTITION;
 
 private AppendCallbacks(Callback userCallback, ProducerInterceptors interceptors, ProducerRecord record) {
 this.userCallback = userCallback;
 this.interceptors = interceptors;
 this.record = record;
+// Note a record would be null only if the client application has 
a bug, but we don't want to
+// have NPE here, because the interceptors would not be notified 
(see .doSend).
+topic = record != null ? record.topic() : null;

Review Comment:
   It checks that the exception is thrown and then it checks that interceptors 
are called.  Probably the test is just sloppy and could use a different error 
condition. KAFKA-14086



##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
 if (log.isTraceEnabled()) {
 // Log the message here, because we don't know the partition 
before that.
-log.trace("Attempting to append record {} with callback {} to 
topic {} partition {}", record, userCallback, record.topic(), partition);
+log.trace("Attempting to append record {} with callback {} to 
topic {} partition {}", record, userCallback, topic, partition);
 }
+
+// Reset record to null here so that it doesn't have to be alive 
as long as the batch is.
+record = null;

Review Comment:
   Updated to extract all record info in the constructor.



##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
 if (log.isTraceEnabled()) {

Review Comment:
   KAFKA-14085



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14086) Cleanup PlaintextConsumerTest.testInterceptors to not pass null record

2022-07-19 Thread Artem Livshits (Jira)
Artem Livshits created KAFKA-14086:
--

 Summary: Cleanup PlaintextConsumerTest.testInterceptors to not 
pass null record
 Key: KAFKA-14086
 URL: https://issues.apache.org/jira/browse/KAFKA-14086
 Project: Kafka
  Issue Type: Task
Reporter: Artem Livshits


See https://github.com/apache/kafka/pull/12365/files#r919746298



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14085) Clean up usage of asserts in KafkaProducer

2022-07-19 Thread Artem Livshits (Jira)
Artem Livshits created KAFKA-14085:
--

 Summary: Clean up usage of asserts in KafkaProducer
 Key: KAFKA-14085
 URL: https://issues.apache.org/jira/browse/KAFKA-14085
 Project: Kafka
  Issue Type: Task
  Components: producer 
Reporter: Artem Livshits


See https://github.com/apache/kafka/pull/12365/files#r919749970



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] guozhangwang commented on pull request #12387: KAFKA-10199: Add RESUME in state updater

2022-07-19 Thread GitBox


guozhangwang commented on PR #12387:
URL: https://github.com/apache/kafka/pull/12387#issuecomment-1189321338

   Thanks @cadonna , I've incorporated your comments and merged to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #12387: KAFKA-10199: Add RESUME in state updater

2022-07-19 Thread GitBox


guozhangwang merged PR #12387:
URL: https://github.com/apache/kafka/pull/12387


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12387: KAFKA-10199: Add RESUME in state updater

2022-07-19 Thread GitBox


guozhangwang commented on code in PR #12387:
URL: https://github.com/apache/kafka/pull/12387#discussion_r924732715


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##
@@ -723,6 +723,106 @@ private void shouldNotPauseTaskInRemovedTasks(final Task 
task) throws Exception
 verifyPausedTasks();
 }
 
+@Test
+public void shouldResumeActiveStatefulTask() throws Exception {
+final StreamTask task = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+shouldResumeStatefulTask(task);
+verify(changelogReader, times(2)).enforceRestoreActive();
+}
+
+@Test
+public void shouldResumeStandbyTask() throws Exception {
+final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+shouldResumeStatefulTask(task);
+verify(changelogReader, times(2)).transitToUpdateStandby();
+}
+
+private void shouldResumeStatefulTask(final Task task) throws Exception {
+stateUpdater.start();
+stateUpdater.add(task);
+
+stateUpdater.pause(task.id());
+
+verifyPausedTasks(task);
+verifyUpdatingTasks();
+
+stateUpdater.resume(task.id());
+
+verifyPausedTasks();
+verifyUpdatingTasks(task);
+}
+
+@Test
+public void shouldIgnoreResumingNotPausedTasks() throws Exception {

Review Comment:
   ack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12387: KAFKA-10199: Add RESUME in state updater

2022-07-19 Thread GitBox


guozhangwang commented on code in PR #12387:
URL: https://github.com/apache/kafka/pull/12387#discussion_r924731119


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##
@@ -428,6 +456,113 @@ private void shouldRemoveStatefulTask(final Task task) 
throws Exception {
 verify(changelogReader).unregister(task.changelogPartitions());
 }
 
+@Test
+public void shouldPauseActiveStatefulTask() throws Exception {
+final StreamTask task = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+shouldPauseStatefulTask(task);
+verify(changelogReader, never()).transitToUpdateStandby();
+}
+
+@Test
+public void shouldPauseStandbyTask() throws Exception {
+final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+shouldPauseStatefulTask(task);
+verify(changelogReader, times(1)).transitToUpdateStandby();
+}
+
+@Test
+public void shouldPauseActiveTaskAndTransitToUpdateStandby() throws 
Exception {
+final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_1, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+
+stateUpdater.start();
+stateUpdater.add(task1);
+stateUpdater.add(task2);
+
+stateUpdater.pause(task1.id());
+
+verifyPausedTasks(task1);
+verifyCheckpointTasks(true, task1);
+verifyRestoredActiveTasks();
+verifyRemovedTasks();
+verifyUpdatingTasks(task2);
+verifyExceptionsAndFailedTasks();
+verify(changelogReader, times(1)).enforceRestoreActive();
+verify(changelogReader, times(1)).transitToUpdateStandby();
+}
+
+private void shouldPauseStatefulTask(final Task task) throws Exception {
+stateUpdater.start();
+stateUpdater.add(task);
+
+stateUpdater.pause(task.id());
+
+verifyPausedTasks(task);
+verifyCheckpointTasks(true, task);
+verifyRestoredActiveTasks();
+verifyRemovedTasks();
+verifyUpdatingTasks();
+verifyExceptionsAndFailedTasks();
+}
+
+@Test
+public void shouldResumeActiveStatefulTask() throws Exception {
+final StreamTask task = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+shouldResumeStatefulTask(task);
+verify(changelogReader, times(2)).enforceRestoreActive();
+}
+
+@Test
+public void shouldResumeStandbyTask() throws Exception {
+final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+shouldResumeStatefulTask(task);
+verify(changelogReader, times(2)).transitToUpdateStandby();
+}
+
+private void shouldResumeStatefulTask(final Task task) throws Exception {
+stateUpdater.start();
+stateUpdater.add(task);
+
+stateUpdater.pause(task.id());
+
+verifyPausedTasks(task);
+verifyUpdatingTasks();
+
+stateUpdater.resume(task.id());
+
+verifyPausedTasks();
+verifyUpdatingTasks(task);
+}
+
+@Test
+public void shouldRemovePausedTask() throws Exception {
+final StreamTask task = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+stateUpdater.start();
+stateUpdater.add(task);
+
+stateUpdater.pause(task.id());
+
+verifyPausedTasks(task);
+verifyCheckpointTasks(true, task);
+verifyRestoredActiveTasks();
+verifyRemovedTasks();
+verifyUpdatingTasks();
+
+stateUpdater.remove(task.id());
+
+verifyPausedTasks();
+verifyRemovedTasks(task);
+verifyRestoredActiveTasks();
+verifyUpdatingTasks();
+
+stateUpdater.resume(task.id());
+
+verifyPausedTasks();
+verifyRemovedTasks(task);
+verifyRestoredActiveTasks();
+verifyUpdatingTasks();
+}

Review Comment:
   Sounds good, ack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on pull request #12274: KAFKA-13959: Controller should unfence Broker with busy metadata log

2022-07-19 Thread GitBox


dengziming commented on PR #12274:
URL: https://github.com/apache/kafka/pull/12274#issuecomment-1189293753

   @jsancio I think the last question is that there may be more than one 
registration record for each broker after restarting it, so can we rely on the 
broker epoch? I think I need some time to check the logic before make a final 
decision.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #12409: KAFKA-14058: Migrate ExactlyOnceWorkerSourceTaskTest from EasyMock and PowerMock to Mockito

2022-07-19 Thread GitBox


clolov commented on PR #12409:
URL: https://github.com/apache/kafka/pull/12409#issuecomment-1189292431

   Oh, apologies, I didn't get a notification for this. Yep, I will review it 
shortly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov commented on pull request #12422: KAFKA-13982: Move WorkerConfigTransformerTest to use Mockito

2022-07-19 Thread GitBox


clolov commented on PR #12422:
URL: https://github.com/apache/kafka/pull/12422#issuecomment-1189290301

   @C0urante I believe you might have the required context to review this pull 
request :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] clolov opened a new pull request, #12422: KAFKA-13982: Move WorkerConfigTransformerTest to use Mockito

2022-07-19 Thread GitBox


clolov opened a new pull request, #12422:
URL: https://github.com/apache/kafka/pull/12422

   Addressing https://issues.apache.org/jira/browse/KAFKA-13982.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #12421: Revert "KAFKA-12887 Skip some RuntimeExceptions from exception handle…

2022-07-19 Thread GitBox


mjsax merged PR #12421:
URL: https://github.com/apache/kafka/pull/12421


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14079) Source task will not commit offsets and develops memory leak if "error.tolerance" is set to "all"

2022-07-19 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568626#comment-17568626
 ] 

Randall Hauch commented on KAFKA-14079:
---

Following up with some additional detail:

This issue can affect users that are upgrading to AK 3.2.0, even if they don't 
modify any Connect worker config or connector configurations. For example, if a 
user has a pre-AK 3.2.0 Connect installation running with one or more source 
connector configurations that use {{{}error.tolerance=all{}}}, then when that 
Connect installation is upgraded to AK 3.2.0 _and_ subsequently the producer 
fails to send and ack messages generated by the source connector (e.g., message 
too large, etc.), then Connect will continue to write records to topics by will 
no longer commit source offsets for that connector. As mentioned above, Connect 
will accumulate those additional records in-memory, causing the worker to 
eventually fail with an OOM.

Unfortunately, restarting is not likely to be helpful, either: the source 
offsets are not changed/committed once this condition happens, so upon restart 
the connector will resume from the previously-committed source offsets and will 
likely regenerate the same problematic messages as before, triggering the 
problem again and causing the same OOM.

The only way to recover is to fix the underlying problem reported by the 
producer (e.g., message too large), and restart the Connect workers. Luckily 
the problems reported by the producer are captured in the worker logs.Note that 
changing the connector configuration to use {{error.tolerance=none}} will cause 
the connector to stop/fail as soon as the producer fails to write a record to 
the topic (e.g., message too large), and will not generate duplicate messages 
beyond the first problematic one (like with {{{}error.tolerance=all{}}}). But 
again, the underlying problem must be corrected before the connector can be 
restarted successfully.

This issue does not affect:
 * sink connectors;
 * source connector configurations that use {{{}error.tolerance=none{}}}, which 
is the default behavior; or
 * source connectors that never use or rely upon source offsets (a smallish 
fraction of all source connector types)

Most source connectors do rely upon source offsets, though, so this is a fairly 
serious issue.

Thanks, [~cshannon] and [~ChrisEgerton] for the quick work and review of these 
PRs. Both PRs linked above (one for the `trunk` branch and one for the `3.2` 
branch) have been merged. The `3.2` PR was merged before the first 3.2.1 RC, 
and so the AK 3.2.1 release should include this fix.

> Source task will not commit offsets and develops memory leak if 
> "error.tolerance" is set to "all"
> -
>
> Key: KAFKA-14079
> URL: https://issues.apache.org/jira/browse/KAFKA-14079
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.2.0
>Reporter: Christopher L. Shannon
>Assignee: Christopher L. Shannon
>Priority: Critical
> Fix For: 3.3.0, 3.2.1
>
>
> KAFKA-13348 added the ability to ignore producer exceptions by setting 
> {{error.tolerance}} to {{{}all{}}}.  When this is set to all a null record 
> metadata is passed to commitRecord() and the task continues.
> The issue is that records are tracked by {{SubmittedRecords}} and the first 
> time an error happens the code does not ack the record with the error and 
> just skips it so it will not have the offsets committed or be removed from 
> SubmittedRecords before calling commitRecord(). 
> This leads to a bug where future offsets won't be committed anymore and also 
> a memory leak because the algorithm that removes acked records from the 
> internal map to commit offsets [looks 
> |https://github.com/apache/kafka/blob/3.2.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java#L177]
>  at the head of the Deque where the records are tracked in and if it sees the 
> record is unacked it will not process anymore removals. This leads to all new 
> records that go through the task to continue to be added and not have offsets 
> committed and never removed from tracking until an OOM error occurs.
> The fix is to make sure to ack the failed records so they can have their 
> offsets commited and be removed from tracking. This is fine to do as the 
> records are intended to be skipped and not reprocessed. Metrics also need to 
> be updated as well.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rhauch commented on pull request #12412: KAFKA-14079 - Ack failed records in WorkerSourceTask when error tolerance is ALL

2022-07-19 Thread GitBox


rhauch commented on PR #12412:
URL: https://github.com/apache/kafka/pull/12412#issuecomment-1189234384

   As you know, the AK project is actively working on a 3.2.1 release; see the 
[Dev List thread](https://lists.apache.org/list?d...@kafka.apache.org:2022-7). 
I've responded to the thread mentioning this issue is no longer a blocker, and 
should appear in the forthcoming 3.2.1 release.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] aiquestion commented on pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

2022-07-19 Thread GitBox


aiquestion commented on PR #12349:
URL: https://github.com/apache/kafka/pull/12349#issuecomment-1189224423

   @showuon Thanks a lot for your comment. 
   i think i've fixed all the comment. And yes, i can start a new PR to do it 
if you think 'current code is okay to merge, and need some refine'.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

2022-07-19 Thread GitBox


aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r924677278


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -1299,6 +1377,178 @@ public void 
testForceMetadataDeleteForPatternSubscriptionDuringRebalance() {
 }
 }
 
+@Test
+public void testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry() {
+rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));
+ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig,
+new Metrics(),
+assignors,
+true,
+subscriptions);
+client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+subscriptions.subscribe(singleton(topic1), rebalanceListener);
+client.prepareResponse(joinGroupFollowerResponse(1, consumerId, 
"leader", Errors.NONE));
+client.prepareResponse(syncGroupResponse(singletonList(t1p), 
Errors.NONE));
+coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE));

Review Comment:
   Thanks! the test looks much simpler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

2022-07-19 Thread GitBox


aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r924676827


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -1299,6 +1377,178 @@ public void 
testForceMetadataDeleteForPatternSubscriptionDuringRebalance() {
 }
 }
 
+@Test
+public void testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry() {
+rebalanceConfig = buildRebalanceConfig(Optional.of("group-id"));

Review Comment:
   fixed, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

2022-07-19 Thread GitBox


aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r924676541


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -1251,6 +1252,83 @@ public void 
testForceMetadataRefreshForPatternSubscriptionDuringRebalance() {
 assertFalse(coordinator.rejoinNeededOrPending());
 }
 
+@Test
+public void testPatternSubscribeAndRejoinGroupAfterTopicDelete() {

Review Comment:
   i've deleted it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

2022-07-19 Thread GitBox


aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r924676347


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -969,6 +973,89 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array(
+"org.apache.kafka.clients.consumer.CooperativeStickyAssignor",
+"org.apache.kafka.clients.consumer.RangeAssignor"))
+  def testRebalanceAndRejoin(assignmentStrategy: String): Unit = {
+// create 2 consumers
+this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"rebalance-and-rejoin-group")
+
this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
 assignmentStrategy)
+this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true")
+val consumer1 = createConsumer()
+val consumer2 = createConsumer()
+
+// create a new topic, have 2 partitions
+val topic = "topic1"
+val producer = createProducer()
+val expectedAssignment = createTopicAndSendRecords(producer, topic, 2, 100)
+
+assertEquals(0, consumer1.assignment().size)
+assertEquals(0, consumer2.assignment().size)
+
+val lock = new ReentrantLock()
+var generationId1 = -1
+var memberId1 = ""
+val customRebalanceListener = new ConsumerRebalanceListener {
+  override def onPartitionsRevoked(partitions: 
util.Collection[TopicPartition]): Unit = {
+  }
+  override def onPartitionsAssigned(partitions: 
util.Collection[TopicPartition]): Unit = {
+if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+  fail(s"Time out while awaiting for lock.")
+}
+try {
+  generationId1 = consumer1.groupMetadata().generationId()
+  memberId1 = consumer1.groupMetadata().memberId()
+} finally {
+  lock.unlock()
+}
+  }
+}
+val consumerPoller1 = new ConsumerAssignmentPoller(consumer1, List(topic), 
Set.empty, customRebalanceListener)
+consumerPoller1.start()
+TestUtils.waitUntilTrue(() => consumerPoller1.consumerAssignment() == 
expectedAssignment,
+  s"Timed out while awaiting expected assignment change to 
$expectedAssignment.")
+
+// Since the consumer1 already completed the rebalance,
+// the `onPartitionsAssigned` rebalance listener will be invoked to set 
the generationId and memberId
+var stableGeneration = -1
+var stableMemberId1 = ""
+if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+  fail(s"Time out while awaiting for lock.")
+}
+try {
+  stableGeneration = generationId1
+  stableMemberId1 = memberId1
+} finally {
+  lock.unlock()
+}
+
+val consumerPoller2 = subscribeConsumerAndStartPolling(consumer2, 
List(topic))
+TestUtils.waitUntilTrue(() => consumerPoller1.consumerAssignment().size == 
1,
+  s"Timed out while awaiting expected assignment change to 1.")
+TestUtils.waitUntilTrue(() => consumerPoller2.consumerAssignment().size == 
1,
+  s"Timed out while awaiting expected assignment change to 1.")
+
+if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) {
+  fail(s"Time out while awaiting for lock.")
+}
+try {
+  if 
(assignmentStrategy.equals(classOf[CooperativeStickyAssignor].getName)) {
+// cooperative rebalance should rebalance twice before finally stable
+assertEquals(stableGeneration + 2, generationId1)
+  } else {
+// eager rebalance should rebalance once once before finally stable

Review Comment:
   fixed, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

2022-07-19 Thread GitBox


aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r924676087


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -740,24 +746,59 @@ private void validateCooperativeAssignment(final 
Map future = maybeAutoCommitOffsetsAsync();
-// return true when
-// 1. future is null, which means no commit request sent, so it is 
still considered completed
-// 2. offset commit completed
-// 3. offset commit failed with non-retriable exception
-if (future == null)
-onJoinPrepareAsyncCommitCompleted = true;
-else if (future.succeeded())
-onJoinPrepareAsyncCommitCompleted = true;
-else if (future.failed() && !future.isRetriable()) {
-log.error("Asynchronous auto-commit of offsets failed: {}", 
future.exception().getMessage());
-onJoinPrepareAsyncCommitCompleted = true;
+// and there is no in-flight offset commit request
+if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
 }
 
+// wait for commit offset response before timer expired.
+if (autoCommitOffsetRequestFuture != null) {
+Timer pollTimer = timer.remainingMs() < 
joinPrepareTimer.remainingMs() ?
+   timer : joinPrepareTimer;
+client.poll(autoCommitOffsetRequestFuture, pollTimer);
+timer.update();
+joinPrepareTimer.update();
+}
+
+// keep retrying the offset commit when:
+// 1. offset commit haven't done (and joinPrepareTime not expired)
+// 2. failed with retryable exception (and joinPrepareTime not expired)
+// Otherwise, continue to revoke partitions, ex:
+// 1. if joinPrepareTime has expired
+// 2. if offset commit failed with no-retryable exception
+// 3. if offset commit success
+boolean onJoinPrepareAsyncCommitCompleted = true;
+if (autoCommitOffsetRequestFuture != null) {
+if (joinPrepareTimer.isExpired()) {
+log.error("Asynchronous auto-commit of offsets failed: 
joinPrepare timeout. Will continue to join group");
+} else if (!autoCommitOffsetRequestFuture.isDone()) {
+onJoinPrepareAsyncCommitCompleted = false;
+} else if (autoCommitOffsetRequestFuture.failed() && 
autoCommitOffsetRequestFuture.isRetriable()) {
+log.debug("Asynchronous auto-commit of offsets failed with 
retryable error: {}. Will retry it.",
+  
autoCommitOffsetRequestFuture.exception().getMessage());
+onJoinPrepareAsyncCommitCompleted = false;
+} else if (autoCommitOffsetRequestFuture.failed() && 
!autoCommitOffsetRequestFuture.isRetriable()) {
+log.error("Asynchronous auto-commit of offsets failed: {}. 
Will continue to join group.",
+  
autoCommitOffsetRequestFuture.exception().getMessage());
+}
+if (autoCommitOffsetRequestFuture.isDone()) {
+autoCommitOffsetRequestFuture = null;
+}
+}
+if (!onJoinPrepareAsyncCommitCompleted) {
+timer.sleep(Math.min(timer.remainingMs(), 
rebalanceConfig.retryBackoffMs));

Review Comment:
   you are right, i just try to update every timer in my last commit. :-) 
fixed, thanks



##
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##
@@ -969,6 +973,89 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array(
+"org.apache.kafka.clients.consumer.CooperativeStickyAssignor",
+"org.apache.kafka.clients.consumer.RangeAssignor"))
+  def testRebalanceAndRejoin(assignmentStrategy: String): Unit = {
+// create 2 consumers
+this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
"rebalance-and-rejoin-group")
+
this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
 assignmentStrategy)
+this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true")
+val consumer1 = createConsumer()
+val consumer2 = createConsumer()
+
+// create a new topic, have 2 partitions
+val topic = "topic1"
+val producer = createProducer()
+val expectedAssignment = createTopicAndSendRecords(producer, topic, 2, 100)
+
+assertEquals(0, consumer1.assignment().size)
+assertEquals(0, consumer2.assignment().size)
+
+val lock = new ReentrantLock()
+var generationId1 = -1
+var memberId1 = ""
+val customRebalanceListener = new ConsumerRebalanceListener {
+  override def onPartitionsRevoked(partitions: 
util.Collection[TopicPartition]): Unit = {
+  }
+  override def onPartitionsAssigned(partitions: 

[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

2022-07-19 Thread GitBox


aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r924675152


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -740,24 +746,59 @@ private void validateCooperativeAssignment(final 
Map future = maybeAutoCommitOffsetsAsync();
-// return true when
-// 1. future is null, which means no commit request sent, so it is 
still considered completed
-// 2. offset commit completed
-// 3. offset commit failed with non-retriable exception
-if (future == null)
-onJoinPrepareAsyncCommitCompleted = true;
-else if (future.succeeded())
-onJoinPrepareAsyncCommitCompleted = true;
-else if (future.failed() && !future.isRetriable()) {
-log.error("Asynchronous auto-commit of offsets failed: {}", 
future.exception().getMessage());
-onJoinPrepareAsyncCommitCompleted = true;
+// and there is no in-flight offset commit request
+if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
 }
 
+// wait for commit offset response before timer expired.
+if (autoCommitOffsetRequestFuture != null) {
+Timer pollTimer = timer.remainingMs() < 
joinPrepareTimer.remainingMs() ?
+   timer : joinPrepareTimer;
+client.poll(autoCommitOffsetRequestFuture, pollTimer);
+timer.update();

Review Comment:
   fixed, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

2022-07-19 Thread GitBox


aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r924674693


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java:
##
@@ -191,7 +191,7 @@ public AbstractCoordinator(GroupRebalanceConfig 
rebalanceConfig,
  * @param memberId The identifier of this member in the previous group or 
"" if there was none
  * @return true If onJoinPrepare async commit succeeded, false otherwise
  */
-protected abstract boolean onJoinPrepare(int generation, String memberId);
+protected abstract boolean onJoinPrepare(Timer timer, int generation, 
String memberId);

Review Comment:
   fixed, thanks



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -740,24 +746,59 @@ private void validateCooperativeAssignment(final 
Map future = maybeAutoCommitOffsetsAsync();
-// return true when
-// 1. future is null, which means no commit request sent, so it is 
still considered completed
-// 2. offset commit completed
-// 3. offset commit failed with non-retriable exception
-if (future == null)
-onJoinPrepareAsyncCommitCompleted = true;
-else if (future.succeeded())
-onJoinPrepareAsyncCommitCompleted = true;
-else if (future.failed() && !future.isRetriable()) {
-log.error("Asynchronous auto-commit of offsets failed: {}", 
future.exception().getMessage());
-onJoinPrepareAsyncCommitCompleted = true;
+// and there is no in-flight offset commit request
+if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
 }
 
+// wait for commit offset response before timer expired.
+if (autoCommitOffsetRequestFuture != null) {
+Timer pollTimer = timer.remainingMs() < 
joinPrepareTimer.remainingMs() ?
+   timer : joinPrepareTimer;
+client.poll(autoCommitOffsetRequestFuture, pollTimer);
+timer.update();
+joinPrepareTimer.update();
+}
+
+// keep retrying the offset commit when:
+// 1. offset commit haven't done (and joinPrepareTime not expired)
+// 2. failed with retryable exception (and joinPrepareTime not expired)

Review Comment:
   fixed, thanks



##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -740,24 +746,59 @@ private void validateCooperativeAssignment(final 
Map future = maybeAutoCommitOffsetsAsync();
-// return true when
-// 1. future is null, which means no commit request sent, so it is 
still considered completed
-// 2. offset commit completed
-// 3. offset commit failed with non-retriable exception
-if (future == null)
-onJoinPrepareAsyncCommitCompleted = true;
-else if (future.succeeded())
-onJoinPrepareAsyncCommitCompleted = true;
-else if (future.failed() && !future.isRetriable()) {
-log.error("Asynchronous auto-commit of offsets failed: {}", 
future.exception().getMessage());
-onJoinPrepareAsyncCommitCompleted = true;
+// and there is no in-flight offset commit request
+if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) {
+autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync();
 }
 
+// wait for commit offset response before timer expired.
+if (autoCommitOffsetRequestFuture != null) {
+Timer pollTimer = timer.remainingMs() < 
joinPrepareTimer.remainingMs() ?
+   timer : joinPrepareTimer;
+client.poll(autoCommitOffsetRequestFuture, pollTimer);
+timer.update();
+joinPrepareTimer.update();
+}
+
+// keep retrying the offset commit when:

Review Comment:
   fixed, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

2022-07-19 Thread GitBox


aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r924670900


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##
@@ -740,24 +746,59 @@ private void validateCooperativeAssignment(final 
Map

[GitHub] [kafka] junrao commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

2022-07-19 Thread GitBox


junrao commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r924667928


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
 if (log.isTraceEnabled()) {
 // Log the message here, because we don't know the partition 
before that.
-log.trace("Attempting to append record {} with callback {} to 
topic {} partition {}", record, userCallback, record.topic(), partition);
+log.trace("Attempting to append record {} with callback {} to 
topic {} partition {}", record, userCallback, topic, partition);
 }
+
+// Reset record to null here so that it doesn't have to be alive 
as long as the batch is.
+record = null;

Review Comment:
   Since the only info we need from record is record.partition(), could we keep 
record.partition() in the instance instead of the whole record? Since 
record.partition() is much smaller, maybe there is no need to nullify it in 
setPartition()?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] MPeli commented on pull request #12331: KAFKA-1194: changes needed to run on Windows

2022-07-19 Thread GitBox


MPeli commented on PR #12331:
URL: https://github.com/apache/kafka/pull/12331#issuecomment-1189166900

   I also had a problem with Kraft which did not start on Windows. The latest 
commit fixes 
https://github.com/apache/kafka/pull/12331/commits/f7cc920771735576d9cfba2afe6f26fdcfb2ccd4
 it. The commit 
https://github.com/apache/kafka/pull/12331/commits/77ae23d816ea1a30a3ec970b7dfe77fd35f7fe00
 most likely fixes 
[KAFKA-7575](https://issues.apache.org/jira/browse/KAFKA-7575) and 
[KAFKA-2427](https://issues.apache.org/jira/browse/KAFKA-2427), but I will have 
to do more testing to confirm that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] splett2 commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.

2022-07-19 Thread GitBox


splett2 commented on code in PR #12416:
URL: https://github.com/apache/kafka/pull/12416#discussion_r923930734


##
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
##
@@ -1878,6 +1878,97 @@ class SocketServerTest {
 }, false)
   }
 
+  /**
+   * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the 
following condition:
+   * 1. Client-Server communication uses SSL socket.
+   * 2. More than one requests are read from the socket into 
SslTransportLayer.netReadBuffer.
+   *
+   * This 300 ms delay occurs because the socket has no data but the buffer 
has data. And the sequence of events that
+   * leads to this situation is the following (from the server point of view):
+   *
+   * Step 1 - SslTransportLayer receives more than one requests in the socket 
and put it in the buffer
+   *  (SslTransportLayer.netReadBuffer).
+   * Step 2 - SslTransportLayer reads all of the bytes and stores it in 
SslTransportLayer.appReadBuffer.
+   * Step 3 - Process the first request, leaving the second request in 
SslTransportLayer.appReadBuffer.
+   * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This 
request is read from
+   * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, 
"select(timeout)" in Selector.poll()
+   * should not block for 300 ms.
+   *
+   * This test is implemented following "makeSocketWithBufferedRequests()" 
method by putting two requests directly
+   * into SslTransportLayer.netReadBuffer and manually trigger the processing.
+   *
+   */
+  @Test
+  def testLatencyWithBufferedDataAndNoSocketData(): Unit = {
+shutdownServerAndMetrics(server)
+
+// to ensure we only have 1 connection (channel)
+val props = sslServerProps
+val numConnections = 1
+props.put("max.connections.per.ip", numConnections.toString)
+
+// create server with SSL listener
+val testableServer = new TestableSocketServer(KafkaConfig.fromProps(props))
+testableServer.enableRequestProcessing(Map.empty)
+//   
dataPlaneAcceptors.get(endpoint).processors(0).selector.asInstanceOf[TestableSelector]
+val testableSelector = testableServer.testableSelector
+val proxyServer = new ProxyServer(testableServer)
+
+try {
+  // trigger SSL handshake by sending the first request and receiving its 
response without buffering
+  val requestBytes = producerRequestBytes()
+  val sslSocket = sslClientSocket(proxyServer.localPort)
+
+  sendRequest(sslSocket, requestBytes)
+  val request1 = receiveRequest(testableServer.dataPlaneRequestChannel)
+  processRequest(testableServer.dataPlaneRequestChannel, request1)
+  receiveResponse(sslSocket)
+
+  // then put 2 requests in SslTransportLayer.netReadBuffer via the 
ProxyServer
+  val connectionId = request1.context.connectionId
+  val listener = 
testableServer.config.dataPlaneListeners.head.listenerName.value
+  val channel = 
testableServer.dataPlaneAcceptor(listener).get.processors(0).channel(connectionId).getOrElse(throw
 new IllegalStateException("Channel not found"))
+  val transportLayer: SslTransportLayer = JTestUtils.fieldValue(channel, 
classOf[KafkaChannel], "transportLayer")
+  val netReadBuffer: ByteBuffer = JTestUtils.fieldValue(transportLayer, 
classOf[SslTransportLayer], "netReadBuffer")
+
+  proxyServer.enableBuffering(netReadBuffer)
+  sendRequest(sslSocket, requestBytes)
+  sendRequest(sslSocket, requestBytes)
+
+  val keysWithBufferedRead: util.Set[SelectionKey] = 
JTestUtils.fieldValue(testableSelector, classOf[Selector], 
"keysWithBufferedRead")
+  keysWithBufferedRead.add(channel.selectionKey)
+  JTestUtils.setFieldValue(transportLayer, "hasBytesBuffered", true)
+
+  // process the first request in the server side
+  // this would move bytes from netReadBuffer to appReadBuffer, then 
process only the first request
+  processRequest(testableServer.dataPlaneRequestChannel)
+
+  // receive response in the client side
+  receiveResponse(sslSocket)
+
+  // process the second request in the server side
+  // this would process the second request in the appReadBuffer
+  // NOTE: this should not block because the data is already in the 
buffer, but without the fix for KAFKA-13559,
+  // this step will take more than 300 ms
+  val processTimeStart = System.currentTimeMillis()
+  processRequest(testableServer.dataPlaneRequestChannel)
+  val processTimeEnd = System.currentTimeMillis()
+
+  // receive response in the client side
+  receiveResponse(sslSocket)

Review Comment:
   I'm trying to think about how we can get stronger sequencing guarantees here.
   
   Maybe we can do something like explicitly set the testable selector's 
pollTimeoutOverride to some large value and explicitly call wakeup once we have 
the right state setup.



##
core/src/test/scala/unit/kafka/network/SocketServerTest.scala:

[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance

2022-07-19 Thread GitBox


aiquestion commented on code in PR #12349:
URL: https://github.com/apache/kafka/pull/12349#discussion_r924600012


##
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##
@@ -1251,6 +1252,83 @@ public void 
testForceMetadataRefreshForPatternSubscriptionDuringRebalance() {
 assertFalse(coordinator.rejoinNeededOrPending());
 }
 
+@Test
+public void testPatternSubscribeAndRejoinGroupAfterTopicDelete() {

Review Comment:
   Actually i wrote it to show the behavior of 
[KAFKA-13310](https://issues.apache.org/jira/browse/KAFKA-13310) case after 
this change.
   It's not a case i want to protect from being changed, i think i can delete 
it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies

2022-07-19 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568598#comment-17568598
 ] 

ASF GitHub Bot commented on KAFKA-13868:


tombentley commented on code in PR #420:
URL: https://github.com/apache/kafka-site/pull/420#discussion_r924595805


##
css/fonts.css:
##
@@ -0,0 +1,82 @@
+/* cutive-mono-regular - latin-ext_latin */
+@font-face {
+font-family: 'Cutive Mono';
+font-style: normal;
+font-weight: 400;
+src: local(''),

Review Comment:
   Ah, thanks! 





> Website updates to satisfy Apache privacy policies
> --
>
> Key: KAFKA-13868
> URL: https://issues.apache.org/jira/browse/KAFKA-13868
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Mickael Maison
>Assignee: Divij Vaidya
>Priority: Critical
>
> The ASF has updated its privacy policy and all websites must be compliant.
> The full guidelines can be found in 
> [https://privacy.apache.org/faq/committers.html]
> The Kafka website has a few issues, including:
> - It's missing a link to the privacy policy: 
> [https://privacy.apache.org/policies/privacy-policy-public.html]
> - It's using Google Analytics
> - It's using Google Fonts
> - It's using scripts hosted on Cloudflare CDN
> - Embedded videos don't have an image placeholder
> As per the email sent to the PMC, all updates have to be done by July 22.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies

2022-07-19 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568584#comment-17568584
 ] 

ASF GitHub Bot commented on KAFKA-13868:


divijvaidya commented on code in PR #420:
URL: https://github.com/apache/kafka-site/pull/420#discussion_r924561314


##
css/fonts.css:
##
@@ -0,0 +1,82 @@
+/* cutive-mono-regular - latin-ext_latin */
+@font-face {
+font-family: 'Cutive Mono';
+font-style: normal;
+font-weight: 400;
+src: local(''),

Review Comment:
   https://stackoverflow.com/a/22835957 explains the rationale (for smiley but 
empty string follows same logic). Though this is only necessary for IE6-8 (very 
old browsers) to handle an edge case (different font with same name locally). I 
think I would remove it to reduce confusion while reading code.

   





> Website updates to satisfy Apache privacy policies
> --
>
> Key: KAFKA-13868
> URL: https://issues.apache.org/jira/browse/KAFKA-13868
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Mickael Maison
>Assignee: Divij Vaidya
>Priority: Critical
>
> The ASF has updated its privacy policy and all websites must be compliant.
> The full guidelines can be found in 
> [https://privacy.apache.org/faq/committers.html]
> The Kafka website has a few issues, including:
> - It's missing a link to the privacy policy: 
> [https://privacy.apache.org/policies/privacy-policy-public.html]
> - It's using Google Analytics
> - It's using Google Fonts
> - It's using scripts hosted on Cloudflare CDN
> - Embedded videos don't have an image placeholder
> As per the email sent to the PMC, all updates have to be done by July 22.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies

2022-07-19 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568573#comment-17568573
 ] 

ASF GitHub Bot commented on KAFKA-13868:


tombentley commented on code in PR #420:
URL: https://github.com/apache/kafka-site/pull/420#discussion_r924521925


##
css/fonts.css:
##
@@ -0,0 +1,82 @@
+/* cutive-mono-regular - latin-ext_latin */
+@font-face {
+font-family: 'Cutive Mono';
+font-style: normal;
+font-weight: 400;
+src: local(''),

Review Comment:
   Can you explain why the argument to `local` is the empty string? I'm no CSS 
expert, but from a quick google 
(https://stackoverflow.com/questions/3837249/font-face-src-local-how-to-use-the-local-font-if-the-user-already-has-it)
 it seems like it's supposed to be used for some kind of caching, but not with 
an empty string argument.





> Website updates to satisfy Apache privacy policies
> --
>
> Key: KAFKA-13868
> URL: https://issues.apache.org/jira/browse/KAFKA-13868
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Mickael Maison
>Assignee: Divij Vaidya
>Priority: Critical
>
> The ASF has updated its privacy policy and all websites must be compliant.
> The full guidelines can be found in 
> [https://privacy.apache.org/faq/committers.html]
> The Kafka website has a few issues, including:
> - It's missing a link to the privacy policy: 
> [https://privacy.apache.org/policies/privacy-policy-public.html]
> - It's using Google Analytics
> - It's using Google Fonts
> - It's using scripts hosted on Cloudflare CDN
> - Embedded videos don't have an image placeholder
> As per the email sent to the PMC, all updates have to be done by July 22.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] nicolasguyomar commented on pull request #12378: MINOR : lower Metadata info log to debug for topic ID change

2022-07-19 Thread GitBox


nicolasguyomar commented on PR #12378:
URL: https://github.com/apache/kafka/pull/12378#issuecomment-1189066691

   Hello @jolshan , this is a Confluent Bundle 7.2.0 console producer output, 
producing a "test" message in topic "nicolas"
   
   You'll see that upon receiving the first Metadata response, we log the 
message that I would like to "hide", but you're probably right we should not 
log it at all
   
   [2022-07-19 15:35:41,214] INFO [Producer clientId=console-producer] 
Resetting the last seen epoch of partition nicolas-0 to 0 since the associated 
topicId changed from null to 7OdYyViuRY2XmggNWuP_Vg 
(org.apache.kafka.clients.Metadata)
   
   
   ```
   ./kafka-console-producer --bootstrap-server=localhost:9092 --topic nicolas
   [2022-07-19 15:35:30,945] INFO Registered kafka:type=kafka.Log4jController 
MBean (kafka.utils.Log4jControllerRegistration$)
   [2022-07-19 15:35:31,012] INFO ProducerConfig values: 

(org.apache.kafka.clients.producer.ProducerConfig)
   [2022-07-19 15:35:31,037] INFO [Producer clientId=console-producer] 
Instantiated an idempotent producer. 
(org.apache.kafka.clients.producer.KafkaProducer)
   [2022-07-19 15:35:31,129] INFO Successfully logged in. 
(org.apache.kafka.common.security.authenticator.AbstractLogin)
   [2022-07-19 15:35:31,227] WARN The configuration 'sasl.mechanisms' was 
supplied but isn't a known config. 
(org.apache.kafka.clients.producer.ProducerConfig)
   [2022-07-19 15:35:31,227] WARN The configuration 'schema.registry.url' was 
supplied but isn't a known config. 
(org.apache.kafka.clients.producer.ProducerConfig)
   [2022-07-19 15:35:31,227] WARN The configuration 
'schema.registry.basic.auth.credentials.source' was supplied but isn't a known 
config. (org.apache.kafka.clients.producer.ProducerConfig)
   [2022-07-19 15:35:31,227] WARN The configuration 'sasl.username' was 
supplied but isn't a known config. 
(org.apache.kafka.clients.producer.ProducerConfig)
   [2022-07-19 15:35:31,227] WARN The configuration 
'schema.registry.basic.auth.user.info' was supplied but isn't a known config. 
(org.apache.kafka.clients.producer.ProducerConfig)
   [2022-07-19 15:35:31,227] WARN The configuration 'sasl.password' was 
supplied but isn't a known config. 
(org.apache.kafka.clients.producer.ProducerConfig)
   [2022-07-19 15:35:31,230] INFO Kafka version: 7.2.0-ce 
(org.apache.kafka.common.utils.AppInfoParser)
   [2022-07-19 15:35:31,230] INFO Kafka commitId: 
510078ca78367bd2542eaca9d26ebd22c5ca95cc 
(org.apache.kafka.common.utils.AppInfoParser)
   [2022-07-19 15:35:31,230] INFO Kafka startTimeMs: 1658237731227 
(org.apache.kafka.common.utils.AppInfoParser)
   >[2022-07-19 15:35:32,290] INFO [Producer clientId=console-producer] Cluster 
ID: lkc-xrrwg (org.apache.kafka.clients.Metadata)
   [2022-07-19 15:35:32,293] INFO [Producer clientId=console-producer] 
ProducerId set to 11385814 with epoch 0 
(org.apache.kafka.clients.producer.internals.TransactionManager)
   test
   [2022-07-19 15:35:41,214] INFO [Producer clientId=console-producer] 
Resetting the last seen epoch of partition nicolas-0 to 0 since the associated 
topicId changed from null to 7OdYyViuRY2XmggNWuP_Vg 
(org.apache.kafka.clients.Metadata)
   >^C[2022-07-19 15:35:55,532] INFO [Producer clientId=console-producer] 
Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 
(org.apache.kafka.clients.producer.KafkaProducer)
   [2022-07-19 15:35:55,557] INFO Metrics scheduler closed 
(org.apache.kafka.common.metrics.Metrics)
   [2022-07-19 15:35:55,557] INFO Closing reporter 
org.apache.kafka.common.metrics.JmxReporter 
(org.apache.kafka.common.metrics.Metrics)
   [2022-07-19 15:35:55,557] INFO Metrics reporters closed 
(org.apache.kafka.common.metrics.Metrics)
   [2022-07-19 15:35:55,558] INFO App info kafka.producer for console-producer 
unregistered (org.apache.kafka.common.utils.AppInfoParser)
   
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies

2022-07-19 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568566#comment-17568566
 ] 

ASF GitHub Bot commented on KAFKA-13868:


divijvaidya commented on PR #424:
URL: https://github.com/apache/kafka-site/pull/424#issuecomment-1189065641

   @mimaison please review for the GDPR compliance.




> Website updates to satisfy Apache privacy policies
> --
>
> Key: KAFKA-13868
> URL: https://issues.apache.org/jira/browse/KAFKA-13868
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Mickael Maison
>Assignee: Divij Vaidya
>Priority: Critical
>
> The ASF has updated its privacy policy and all websites must be compliant.
> The full guidelines can be found in 
> [https://privacy.apache.org/faq/committers.html]
> The Kafka website has a few issues, including:
> - It's missing a link to the privacy policy: 
> [https://privacy.apache.org/policies/privacy-policy-public.html]
> - It's using Google Analytics
> - It's using Google Fonts
> - It's using scripts hosted on Cloudflare CDN
> - Embedded videos don't have an image placeholder
> As per the email sent to the PMC, all updates have to be done by July 22.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies

2022-07-19 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568565#comment-17568565
 ] 

ASF GitHub Bot commented on KAFKA-13868:


divijvaidya opened a new pull request, #424:
URL: https://github.com/apache/kafka-site/pull/424

   As per the [ASF privacy 
policy](https://privacy.apache.org/faq/committers.html), Google Analytics 
should be replaced with Apache hosted version of Matomo to remain complaint 
with GDPR.
   
   Email thread where we received the site Id that is used with Matomo: 
https://lists.apache.org/thread/0rpo0ffcd70c2yxfnqfqk43oyg7c8x8d 
   
   ## Code changes
   - Remove Google Analytics script
   - Remote `google-site-verification` files which are used by Google to verify 
the ownership of site.
   - Add Matomo script to the `` section (all JS scripts should ideally 
be placed there and not in ``).
   
   ## Results
   After deploying the changes, we should be able to analyse the results at 
https://analytics.apache.org/index.php?module=MultiSites=index=1=day=yesterday




> Website updates to satisfy Apache privacy policies
> --
>
> Key: KAFKA-13868
> URL: https://issues.apache.org/jira/browse/KAFKA-13868
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Mickael Maison
>Assignee: Divij Vaidya
>Priority: Critical
>
> The ASF has updated its privacy policy and all websites must be compliant.
> The full guidelines can be found in 
> [https://privacy.apache.org/faq/committers.html]
> The Kafka website has a few issues, including:
> - It's missing a link to the privacy policy: 
> [https://privacy.apache.org/policies/privacy-policy-public.html]
> - It's using Google Analytics
> - It's using Google Fonts
> - It's using scripts hosted on Cloudflare CDN
> - Embedded videos don't have an image placeholder
> As per the email sent to the PMC, all updates have to be done by July 22.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies

2022-07-19 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568535#comment-17568535
 ] 

ASF GitHub Bot commented on KAFKA-13868:


showuon commented on PR #420:
URL: https://github.com/apache/kafka-site/pull/420#issuecomment-1189006953

   
   
   > That is a very fair question to ask. The number of actual font files are 
lesser because the new font files contain all the charset in one file.
   
   I see. Thanks.
   
   > I will wait till tomorrow for others to add their thoughts. After that I 
can change this PR with the approach that you mentioned. Sounds ok Luke?
   
   Yes, let's see what other people's thoughts. But I don't think it needs to 
adopt my approach if the font output is the same. 
   
   Thank you. 




> Website updates to satisfy Apache privacy policies
> --
>
> Key: KAFKA-13868
> URL: https://issues.apache.org/jira/browse/KAFKA-13868
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Mickael Maison
>Assignee: Divij Vaidya
>Priority: Critical
>
> The ASF has updated its privacy policy and all websites must be compliant.
> The full guidelines can be found in 
> [https://privacy.apache.org/faq/committers.html]
> The Kafka website has a few issues, including:
> - It's missing a link to the privacy policy: 
> [https://privacy.apache.org/policies/privacy-policy-public.html]
> - It's using Google Analytics
> - It's using Google Fonts
> - It's using scripts hosted on Cloudflare CDN
> - Embedded videos don't have an image placeholder
> As per the email sent to the PMC, all updates have to be done by July 22.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies

2022-07-19 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568515#comment-17568515
 ] 

ASF GitHub Bot commented on KAFKA-13868:


mimaison merged PR #421:
URL: https://github.com/apache/kafka-site/pull/421




> Website updates to satisfy Apache privacy policies
> --
>
> Key: KAFKA-13868
> URL: https://issues.apache.org/jira/browse/KAFKA-13868
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Mickael Maison
>Assignee: Divij Vaidya
>Priority: Critical
>
> The ASF has updated its privacy policy and all websites must be compliant.
> The full guidelines can be found in 
> [https://privacy.apache.org/faq/committers.html]
> The Kafka website has a few issues, including:
> - It's missing a link to the privacy policy: 
> [https://privacy.apache.org/policies/privacy-policy-public.html]
> - It's using Google Analytics
> - It's using Google Fonts
> - It's using scripts hosted on Cloudflare CDN
> - Embedded videos don't have an image placeholder
> As per the email sent to the PMC, all updates have to be done by July 22.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] viktorsomogyi commented on pull request #11886: KAFKA-13730: OAuth access token validation fails if it does not conta…

2022-07-19 Thread GitBox


viktorsomogyi commented on PR #11886:
URL: https://github.com/apache/kafka/pull/11886#issuecomment-1188969830

   @rajinisivaram @omkreddy can you help in getting this reviewed?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies

2022-07-19 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568483#comment-17568483
 ] 

ASF GitHub Bot commented on KAFKA-13868:


divijvaidya commented on PR #420:
URL: https://github.com/apache/kafka-site/pull/420#issuecomment-1188923709

   > We can download the font via 
https://fonts.gstatic.com/s/cutivemono/v14/m8JWjfRfY7WVjVi2E-K9H6RMTm6o39ucNvc.woff2
   
   Oh yes. We can do that but I think that would be less maintainable. This is 
because when a new version of these fonts are available, we won't have a 
mechanism to update it. I can go ahead with this approach too if that makes it 
safer to accept this change. I don't have any strong opinions on this.
   
   > Does that mean we actually only use 9 of them? How do you know that?
   
   That is a very fair question to ask. The number of actual font files are 
lesser because the new font files contain all the charset in one file.
   
   I will wait till tomorrow for others to add their thoughts. After that I can 
change this PR with the approach that you mentioned. Sounds ok Luke?
   
   
   




> Website updates to satisfy Apache privacy policies
> --
>
> Key: KAFKA-13868
> URL: https://issues.apache.org/jira/browse/KAFKA-13868
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Mickael Maison
>Assignee: Divij Vaidya
>Priority: Critical
>
> The ASF has updated its privacy policy and all websites must be compliant.
> The full guidelines can be found in 
> [https://privacy.apache.org/faq/committers.html]
> The Kafka website has a few issues, including:
> - It's missing a link to the privacy policy: 
> [https://privacy.apache.org/policies/privacy-policy-public.html]
> - It's using Google Analytics
> - It's using Google Fonts
> - It's using scripts hosted on Cloudflare CDN
> - Embedded videos don't have an image placeholder
> As per the email sent to the PMC, all updates have to be done by July 22.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cshannon commented on pull request #12412: KAFKA-14079 - Ack failed records in WorkerSourceTask when error tolerance is ALL

2022-07-19 Thread GitBox


cshannon commented on PR #12412:
URL: https://github.com/apache/kafka/pull/12412#issuecomment-1188913781

   @rhauch - Great, thanks for merging this so quickly. I've been running a 
custom build with this fix for a couple days and it has looked good so far so 
it will be nice to get 3.2.1 out soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14074) Restarting a broker during re-assignment can leave log directory entries

2022-07-19 Thread Adrian Preston (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568465#comment-17568465
 ] 

Adrian Preston commented on KAFKA-14074:


Thanks for pointing out KAFKA-13972, [~jolshan]. Unfortunately I don't think 
this is exactly the same problem. I've built the branch corresponding to the 
pull request in KAFKA-13972 ([https://github.com/apache/kafka/pull/12271)], and 
can still reproduce the stray topic partition directories problem described in 
this issue.

> Restarting a broker during re-assignment can leave log directory entries
> 
>
> Key: KAFKA-14074
> URL: https://issues.apache.org/jira/browse/KAFKA-14074
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0, 3.1.0
>Reporter: Adrian Preston
>Priority: Major
>
> Re-starting a broker while replicas are being assigned away from the broker 
> can result in topic partition directories being left in the broker’s log 
> directory. This can trigger further problems if such a topic is deleted and 
> re-created. These problems occur when replicas for the new topic are placed 
> on a broker that hosts a “stale” topic partition directory of the same name, 
> causing the on-disk topic partition state held by different brokers in the 
> cluster to diverge.
> We have also been able to re-produce variants this problem using Kafka 2.8 
> and 3.1, as well as Kafka built from the head of the apache/kafka repository 
> (at the time of writing this is commit: 
> 94d4fdeb28b3cd4d474d943448a7ef653eaa145d). We have *not* being able to 
> re-produce this problem with Kafka running in KRaft mode.
> A minimal re-create for topic directories being left on disk is as follows:
>  # Start ZooKeeper and a broker (both using the sample config)
>  # Create 100 topics: each with 1 partition, and with replication factor 1
>  # Add a second broker to the Kafka cluster (with minor edits to the sample 
> config for: {{{}broker.id{}}}, {{{}listeners{}}}, and {{{}log.dirs{}}})
>  # Issue a re-assignment that moves all of the topic partition replicas  from 
> the first broker to the second broker
>  # While this re-assignment is taking place shutdown the first broker (you 
> need to be quick with only two brokers and 100 topics…)
>  # Wait a few seconds for the re-assignment to stall
>  # Restart the first broker and wait for the re-assignment to complete and it 
> to remove any partially deleted topics (e.g. those with a “-delete” suffix).
> Inspecting the logs directory for the first broker should show directories 
> corresponding to topic partitions that are owned by the second broker. These 
> are not cleaned up when the re-assignment completes, and also remain in the 
> logs directory even if the first broker is restarted.  Deleting the topic 
> also does not clean up the topic partitions left behind on the first broker - 
> which leads to a second potential problem.
> For topics that have more than one replica: a new topic that has the same 
> name as a previously deleted topic might have replicas created on a broker 
> with “stale” topic partition directories. If this happens these topics will 
> remain in an under-replicated state.
> A minimal re-create for this is as follows:
>  # Create a three node Kafka cluster (backed by ZK) based off the sample 
> config (to avoid confusion let’s call these kafka-0, kafka-1, and kafka-2)
>  # Create 100 topics: each with 1 partition, and with replication factor 2
>  # Submit a re-assignment to move all of the topic partition replicas to 
> kafka-0 and kafka-1,  and wait for it to complete
>  # Submit a re-assignment to move all of the topic partition replicas on 
> kafka-0 to kafka-2.
>  # While this re-assignment is taking place shutdown and re-start kafka-0.
>  # Wait for the re-assignment to complete, and check that there’s unexpected 
> topic partition directories in kafka-0’s logs directory
>  # Delete all 100 topics, and re-create 100 new topics with the same name and 
> configuration as the deleted topics.
> In this state kafka-1 and kafka-2 continually generate log messages similar 
> to:
> {{[2022-07-14 13:07:49,118] WARN [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Received INCONSISTENT_TOPIC_ID from the leader for partition 
> test-039-0. This error may be returned transiently when the partition is 
> being created or deleted, but it is not expected to persist. 
> (kafka.server.ReplicaFetcherThread)}}
> Topics that have had replicas created on kafka-0 are under-replicated with 
> kafka-0 missing from the ISR list. Performing a rolling restart of each 
> broker in turn does not resolve the problem, in fact more partitions are 
> listed as under-replicated, as before kafka-0 is missing from their ISR list.
> I also tried to re-create this with Kafka running 

[GitHub] [kafka] fvaleri commented on pull request #12401: Minor: replace .kafka with .log in implementation documentation

2022-07-19 Thread GitBox


fvaleri commented on PR #12401:
URL: https://github.com/apache/kafka/pull/12401#issuecomment-1188894824

   @showuon is it fine? Do you have any more feedback on this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12331: KAFKA-1194: changes needed to run on Windows

2022-07-19 Thread GitBox


divijvaidya commented on PR #12331:
URL: https://github.com/apache/kafka/pull/12331#issuecomment-1188878088

   @ijuma, would like to take a look at this or perhaps tag a committer who 
would be most suited to look into this one? (note that we can review this PR 
from the lens of change the file preallocation logic to use the new Java NIO.2 
APIs. The new APIs are better at handling Windows OS which is nice.)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #12309: KAFKA-14007: Invoking connect headers.close method on shutdown

2022-07-19 Thread GitBox


vamossagar12 commented on PR #12309:
URL: https://github.com/apache/kafka/pull/12309#issuecomment-1188874838

   @showuon could you plz review/merge this? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #12321: KAFKA-14012: Adding null checks for cases when closeQuietly was being passed a lambda object

2022-07-19 Thread GitBox


vamossagar12 commented on PR #12321:
URL: https://github.com/apache/kafka/pull/12321#issuecomment-1188874392

   @showuon could you plz review/merge this? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #12331: KAFKA-1194: changes needed to run on Windows

2022-07-19 Thread GitBox


mimaison commented on PR #12331:
URL: https://github.com/apache/kafka/pull/12331#issuecomment-1188867448

   I'm afraid I have no experience running Kafka on Windows so I'm not familiar 
with the issues on that platform.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12387: KAFKA-10199: Add RESUME in state updater

2022-07-19 Thread GitBox


cadonna commented on code in PR #12387:
URL: https://github.com/apache/kafka/pull/12387#discussion_r924277101


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##
@@ -723,6 +723,106 @@ private void shouldNotPauseTaskInRemovedTasks(final Task 
task) throws Exception
 verifyPausedTasks();
 }
 
+@Test
+public void shouldResumeActiveStatefulTask() throws Exception {
+final StreamTask task = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+shouldResumeStatefulTask(task);
+verify(changelogReader, times(2)).enforceRestoreActive();
+}
+
+@Test
+public void shouldResumeStandbyTask() throws Exception {
+final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+shouldResumeStatefulTask(task);
+verify(changelogReader, times(2)).transitToUpdateStandby();
+}
+
+private void shouldResumeStatefulTask(final Task task) throws Exception {
+stateUpdater.start();
+stateUpdater.add(task);
+
+stateUpdater.pause(task.id());
+
+verifyPausedTasks(task);
+verifyUpdatingTasks();
+
+stateUpdater.resume(task.id());
+
+verifyPausedTasks();
+verifyUpdatingTasks(task);
+}
+
+@Test
+public void shouldIgnoreResumingNotPausedTasks() throws Exception {

Review Comment:
   I would rather call this `shouldNotResumeNotExistingTasks()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies

2022-07-19 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568439#comment-17568439
 ] 

ASF GitHub Bot commented on KAFKA-13868:


showuon commented on PR #420:
URL: https://github.com/apache/kafka-site/pull/420#issuecomment-1188822349

   @divijvaidya , I see. But in 
https://fonts.googleapis.com/css?family=Cutive+Mono%7CRoboto:100,300,400,700,900%7CRoboto+Condensed:300
 , we can download all the fonts in `woff2` format via the link inside the font 
css, ex: 
   
   ```
   /* latin-ext */
   @font-face {
 font-family: 'Cutive Mono';
 font-style: normal;
 font-weight: 400;
 src: 
url(https://fonts.gstatic.com/s/cutivemono/v14/m8JWjfRfY7WVjVi2E-K9H6RMTm6o39ucNvc.woff2)
 format('woff2');
 unicode-range: U+0100-024F, U+0259, U+1E00-1EFF, U+2020, U+20A0-20AB, 
U+20AD-20CF, U+2113, U+2C60-2C7F, U+A720-A7FF;
   }
   ```
   We can download the font via 
https://fonts.gstatic.com/s/cutivemono/v14/m8JWjfRfY7WVjVi2E-K9H6RMTm6o39ucNvc.woff2
 . 
   
   However, I don't insist that we should download the font from google. I just 
want to make sure we don't miss any fonts we are using in Kafka website now. 
So, my next question is, how do we know we only need these 9 fonts you added? I 
saw there are 44 fonts css in this link: 
https://fonts.googleapis.com/css?family=Cutive+Mono%7CRoboto:100,300,400,700,900%7CRoboto+Condensed:300
 . Does that mean we actually only use 9 of them? How do you know that?
   
   Thank you again for helping working on this.




> Website updates to satisfy Apache privacy policies
> --
>
> Key: KAFKA-13868
> URL: https://issues.apache.org/jira/browse/KAFKA-13868
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Mickael Maison
>Assignee: Divij Vaidya
>Priority: Critical
>
> The ASF has updated its privacy policy and all websites must be compliant.
> The full guidelines can be found in 
> [https://privacy.apache.org/faq/committers.html]
> The Kafka website has a few issues, including:
> - It's missing a link to the privacy policy: 
> [https://privacy.apache.org/policies/privacy-policy-public.html]
> - It's using Google Analytics
> - It's using Google Fonts
> - It's using scripts hosted on Cloudflare CDN
> - Embedded videos don't have an image placeholder
> As per the email sent to the PMC, all updates have to be done by July 22.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cadonna commented on a diff in pull request #12387: KAFKA-10199: Add RESUME in state updater

2022-07-19 Thread GitBox


cadonna commented on code in PR #12387:
URL: https://github.com/apache/kafka/pull/12387#discussion_r924276012


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##
@@ -428,6 +456,113 @@ private void shouldRemoveStatefulTask(final Task task) 
throws Exception {
 verify(changelogReader).unregister(task.changelogPartitions());
 }
 
+@Test
+public void shouldPauseActiveStatefulTask() throws Exception {
+final StreamTask task = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+shouldPauseStatefulTask(task);
+verify(changelogReader, never()).transitToUpdateStandby();
+}
+
+@Test
+public void shouldPauseStandbyTask() throws Exception {
+final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+shouldPauseStatefulTask(task);
+verify(changelogReader, times(1)).transitToUpdateStandby();
+}
+
+@Test
+public void shouldPauseActiveTaskAndTransitToUpdateStandby() throws 
Exception {
+final StreamTask task1 = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_1, 
Collections.singletonList(TOPIC_PARTITION_B_0));
+
+stateUpdater.start();
+stateUpdater.add(task1);
+stateUpdater.add(task2);
+
+stateUpdater.pause(task1.id());
+
+verifyPausedTasks(task1);
+verifyCheckpointTasks(true, task1);
+verifyRestoredActiveTasks();
+verifyRemovedTasks();
+verifyUpdatingTasks(task2);
+verifyExceptionsAndFailedTasks();
+verify(changelogReader, times(1)).enforceRestoreActive();
+verify(changelogReader, times(1)).transitToUpdateStandby();
+}
+
+private void shouldPauseStatefulTask(final Task task) throws Exception {
+stateUpdater.start();
+stateUpdater.add(task);
+
+stateUpdater.pause(task.id());
+
+verifyPausedTasks(task);
+verifyCheckpointTasks(true, task);
+verifyRestoredActiveTasks();
+verifyRemovedTasks();
+verifyUpdatingTasks();
+verifyExceptionsAndFailedTasks();
+}
+
+@Test
+public void shouldResumeActiveStatefulTask() throws Exception {
+final StreamTask task = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+shouldResumeStatefulTask(task);
+verify(changelogReader, times(2)).enforceRestoreActive();
+}
+
+@Test
+public void shouldResumeStandbyTask() throws Exception {
+final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+shouldResumeStatefulTask(task);
+verify(changelogReader, times(2)).transitToUpdateStandby();
+}
+
+private void shouldResumeStatefulTask(final Task task) throws Exception {
+stateUpdater.start();
+stateUpdater.add(task);
+
+stateUpdater.pause(task.id());
+
+verifyPausedTasks(task);
+verifyUpdatingTasks();
+
+stateUpdater.resume(task.id());
+
+verifyPausedTasks();
+verifyUpdatingTasks(task);
+}
+
+@Test
+public void shouldRemovePausedTask() throws Exception {
+final StreamTask task = 
createActiveStatefulTaskInStateRestoring(TASK_0_0, 
Collections.singletonList(TOPIC_PARTITION_A_0));
+stateUpdater.start();
+stateUpdater.add(task);
+
+stateUpdater.pause(task.id());
+
+verifyPausedTasks(task);
+verifyCheckpointTasks(true, task);
+verifyRestoredActiveTasks();
+verifyRemovedTasks();
+verifyUpdatingTasks();
+
+stateUpdater.remove(task.id());
+
+verifyPausedTasks();
+verifyRemovedTasks(task);
+verifyRestoredActiveTasks();
+verifyUpdatingTasks();
+
+stateUpdater.resume(task.id());
+
+verifyPausedTasks();
+verifyRemovedTasks(task);
+verifyRestoredActiveTasks();
+verifyUpdatingTasks();
+}

Review Comment:
   I was rather thinking about verifying that removed tasks, failed tasks, and 
restored tasks cannot be resumed. That means without pausing them beforehand. 
You just ensure that a task failed and then you try to resume it. The same 
applies to restored and removed tasks.   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12331: KAFKA-1194: changes needed to run on Windows

2022-07-19 Thread GitBox


divijvaidya commented on PR #12331:
URL: https://github.com/apache/kafka/pull/12331#issuecomment-1188795126

   @mimaison perhaps you might be interested in looking at this one? I have 
done an initial review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

2022-07-19 Thread GitBox


ijuma commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r924206144


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
 if (log.isTraceEnabled()) {
 // Log the message here, because we don't know the partition 
before that.
-log.trace("Attempting to append record {} with callback {} to 
topic {} partition {}", record, userCallback, record.topic(), partition);
+log.trace("Attempting to append record {} with callback {} to 
topic {} partition {}", record, userCallback, topic, partition);
 }
+
+// Reset record to null here so that it doesn't have to be alive 
as long as the batch is.
+record = null;

Review Comment:
   I find that the code complexity to achieve this trace logging is a bit high. 
I have some ideas on how to improve it, but we can leave that for latter. A 
simple suggestion for now would be to change `setPartition` to 
`onPartitionAssigned` or something like that. This would indicate a general 
callback that can do anything once the partition is known.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies

2022-07-19 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568411#comment-17568411
 ] 

ASF GitHub Bot commented on KAFKA-13868:


divijvaidya commented on PR #420:
URL: https://github.com/apache/kafka-site/pull/420#issuecomment-1188739303

   > @divijvaidya , why do you use 
https://google-webfonts-helper.herokuapp.com/fonts/ to download the fonts, 
instead of the original link: 
[https://fonts.googleapis.com/css?family=Cutive+Mono|Roboto:100,300,400,700,900|Roboto+Condensed:300](https://fonts.googleapis.com/css?family=Cutive+Mono%7CRoboto:100,300,400,700,900%7CRoboto+Condensed:300)
 ? Will that make any difference?
   
   Hey @showuon 
   If you visit the link 
https://fonts.googleapis.com/css?family=Cutive+Mono%7CRoboto:100,300,400,700,900%7CRoboto+Condensed:300,
 the `url` still points to Google's CDN and hence, does not fulfil our 
objective of self-hosting the fonts. An alternative could be to download the 
fonts from the source https://fonts.google.com/ but they only allow to download 
font family in `ttf` format. In that case, we would need to use a third party 
tool to convert this `ttl` to `woff2` compression. Further this site, 
https://fonts.google.com/, does not allow (at least I couldn't find it) to 
download for different charset such as vietnamese etc.
   
   Note that helper tool I used 
https://google-webfonts-helper.herokuapp.com/fonts/  is using a MIT license and 
has [10K stars on Github](https://github.com/majodev/google-webfonts-helper), 
so I decided to trust it.




> Website updates to satisfy Apache privacy policies
> --
>
> Key: KAFKA-13868
> URL: https://issues.apache.org/jira/browse/KAFKA-13868
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Mickael Maison
>Assignee: Divij Vaidya
>Priority: Critical
>
> The ASF has updated its privacy policy and all websites must be compliant.
> The full guidelines can be found in 
> [https://privacy.apache.org/faq/committers.html]
> The Kafka website has a few issues, including:
> - It's missing a link to the privacy policy: 
> [https://privacy.apache.org/policies/privacy-policy-public.html]
> - It's using Google Analytics
> - It's using Google Fonts
> - It's using scripts hosted on Cloudflare CDN
> - Embedded videos don't have an image placeholder
> As per the email sent to the PMC, all updates have to be done by July 22.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ijuma commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

2022-07-19 Thread GitBox


ijuma commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r924191711


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -1465,13 +1465,17 @@ public boolean isDone() {
 private class AppendCallbacks implements 
RecordAccumulator.AppendCallbacks {
 private final Callback userCallback;
 private final ProducerInterceptors interceptors;
-private final ProducerRecord record;
+private ProducerRecord record;
+private final String topic;
 protected int partition = RecordMetadata.UNKNOWN_PARTITION;
 
 private AppendCallbacks(Callback userCallback, ProducerInterceptors interceptors, ProducerRecord record) {
 this.userCallback = userCallback;
 this.interceptors = interceptors;
 this.record = record;
+// Note a record would be null only if the client application has 
a bug, but we don't want to
+// have NPE here, because the interceptors would not be notified 
(see .doSend).
+topic = record != null ? record.topic() : null;

Review Comment:
   I looked at the test and it seems to check that an exception is thrown? As 
@junrao said, this can be done by validating what `send` receives instead of 
polluting the whole codebase. I'm OK if we file a JIRA for that and do it as a 
separate PR. But we should remove this code when we do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

2022-07-19 Thread GitBox


ijuma commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r924191711


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -1465,13 +1465,17 @@ public boolean isDone() {
 private class AppendCallbacks implements 
RecordAccumulator.AppendCallbacks {
 private final Callback userCallback;
 private final ProducerInterceptors interceptors;
-private final ProducerRecord record;
+private ProducerRecord record;
+private final String topic;
 protected int partition = RecordMetadata.UNKNOWN_PARTITION;
 
 private AppendCallbacks(Callback userCallback, ProducerInterceptors interceptors, ProducerRecord record) {
 this.userCallback = userCallback;
 this.interceptors = interceptors;
 this.record = record;
+// Note a record would be null only if the client application has 
a bug, but we don't want to
+// have NPE here, because the interceptors would not be notified 
(see .doSend).
+topic = record != null ? record.topic() : null;

Review Comment:
   I looked at the test and it seems to check that an exception is thrown? As 
@junrao said, this can be done by validating what `send` receives instead of 
polluting the whole codebase.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer

2022-07-19 Thread GitBox


ijuma commented on code in PR #12365:
URL: https://github.com/apache/kafka/pull/12365#discussion_r924189300


##
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##
@@ -1491,20 +1495,25 @@ public void setPartition(int partition) {
 
 if (log.isTraceEnabled()) {

Review Comment:
   For the line in this method, you could do something like:
   
   ```java
   if (partition < 0)
 throw new IllegalArgumentException("partition should be positive, but it 
was " + partition):
   ```
   
   Which is more informative and idiomatic and checks the more general case 
that we expect partitions to be positive. But I see that we have sprinkled the 
same check in other methods. So, having a `assertPartitionIsPositive` would 
probably be a better approach. In any case, since this code was introduced in a 
different change, we can file a JIRA and do it as a separate PR.
   
   I am happy to discuss more, but we should be clear about terminology. 
Language level asserts in Java aren't used much. Checking preconditions through 
API boundaries is useful. Within a given boundary, it's best to use the type 
system to avoid having noise all over the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies

2022-07-19 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568395#comment-17568395
 ] 

ASF GitHub Bot commented on KAFKA-13868:


showuon commented on PR #420:
URL: https://github.com/apache/kafka-site/pull/420#issuecomment-1188708449

   @scott-confluent , could you help review this PR since you added the google 
font into this project? Thanks.




> Website updates to satisfy Apache privacy policies
> --
>
> Key: KAFKA-13868
> URL: https://issues.apache.org/jira/browse/KAFKA-13868
> Project: Kafka
>  Issue Type: Bug
>  Components: website
>Reporter: Mickael Maison
>Assignee: Divij Vaidya
>Priority: Critical
>
> The ASF has updated its privacy policy and all websites must be compliant.
> The full guidelines can be found in 
> [https://privacy.apache.org/faq/committers.html]
> The Kafka website has a few issues, including:
> - It's missing a link to the privacy policy: 
> [https://privacy.apache.org/policies/privacy-policy-public.html]
> - It's using Google Analytics
> - It's using Google Fonts
> - It's using scripts hosted on Cloudflare CDN
> - Embedded videos don't have an image placeholder
> As per the email sent to the PMC, all updates have to be done by July 22.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >