[GitHub] [kafka] artemlivshits commented on pull request #12365: KAFKA-14020: Performance regression in Producer
artemlivshits commented on PR #12365: URL: https://github.com/apache/kafka/pull/12365#issuecomment-1189805608 Looked at the failed tests, seem unrelated and pass locally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance
showuon commented on code in PR #12349: URL: https://github.com/apache/kafka/pull/12349#discussion_r925138410 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ## @@ -1299,17 +1300,79 @@ public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() { } } +@Test +public void testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry() { +try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.empty(), false)) { +int generationId = 42; +String memberId = "consumer-42"; + +Timer pollTimer = time.timer(100L); +client.prepareResponse(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION))); +boolean res = coordinator.onJoinPrepare(pollTimer, generationId, memberId); +assertFalse(res); + +pollTimer = time.timer(100L); +client.prepareResponse(offsetCommitResponse(singletonMap(t1p, Errors.NONE))); +res = coordinator.onJoinPrepare(pollTimer, generationId, memberId); +assertTrue(res); + +assertFalse(client.hasPendingResponses()); +assertFalse(client.hasInFlightRequests()); +assertFalse(coordinator.coordinatorUnknown()); +} +} + +@Test +public void testOnJoinPrepareWithOffsetCommitShouldKeepJoinAfterNonRetryableException() { +try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.empty(), false)) { +int generationId = 42; +String memberId = "consumer-42"; + +Timer pollTimer = time.timer(100L); +client.prepareResponse(offsetCommitResponse(singletonMap(t1p, Errors.UNKNOWN_MEMBER_ID))); +boolean res = coordinator.onJoinPrepare(pollTimer, generationId, memberId); +assertTrue(res); + +assertFalse(client.hasPendingResponses()); +assertFalse(client.hasInFlightRequests()); +assertFalse(coordinator.coordinatorUnknown()); +} +} + +@Test +public void testOnJoinPrepareWithOffsetCommitShouldKeepJoinAfterRebalanceTimeout() { +try (ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, Optional.empty(), false)) { +int generationId = 42; +String memberId = "consumer-42"; + +Timer pollTimer = time.timer(100L); +time.sleep(150); Review Comment: @aiquestion , could you file another PR to remove this line? This is unneeded, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14088) KafkaChannel memory leak
Gao Fei created KAFKA-14088: --- Summary: KafkaChannel memory leak Key: KAFKA-14088 URL: https://issues.apache.org/jira/browse/KAFKA-14088 Project: Kafka Issue Type: Bug Components: network Affects Versions: 2.2.1 Environment: Current system environment: kafka version: 2.2.1 openjdk(openj9): jdk1.8 Heap memory: 6.4GB MaxDirectSize: 8GB Total number of topics: about 150+, each with about 3 partitions Reporter: Gao Fei The kafka broker reports OutOfMemoryError: Java heap space and OutOfMemoryError: Direct buffer memory at the same time. Through the memory dump, it is found that the most occupied objects are KafkaChannel->NetworkReceive->HeapByteBuffer, there are about 4 such KafkaChannels, each about 1.5GB Around, and the total heap memory allocation is only 6.4GB. It's strange why a KafkaChannel occupies so much heap memory. Isn't each batch request slowly written to disk through the RequestHandler thread? Normally, this memory in KafkaChannel should be released continuously, but it is not released. I am curious why there is such a large HeapByteBuffer object in KafkaChannel? What does this object store? Shouldn't the socket communication here use a lot of direct memory? Instead, why a lot of heap memory is used, and why is it not released? The business data is not very large, the business data of each customer is different, and some customers have this OOM in the environment, and some customers with large business data do not appear OOM. java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Bits.java:693) at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:174) at sun.nio.ch.IOUtil.read(IOUtil.java:195) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385) at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572) at org.apache.kafka.common.network.Selector.poll(Selector.java:483) at kafka.network.Processor.poll(SocketServer.scala:863) at kafka.network.Processor.run(SocketServer.scala:762) at java.lang.Thread.run(Thread.java:745) java.lang.OutOfMemoryError: Java heap space at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) at org.apache.kafka.common.MemoryPool$1.tryAllocate(MemoryPool.java:30) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385) at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572) at org.apache.kafka.common.network.Selector.poll(Selector.java:483) at kafka.network.Processor.poll(SocketServer.scala:863) at kafka.network.Processor.run(SocketServer.scala:762) at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14024) Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare
[ https://issues.apache.org/jira/browse/KAFKA-14024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-14024. --- Resolution: Fixed > Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare > -- > > Key: KAFKA-14024 > URL: https://issues.apache.org/jira/browse/KAFKA-14024 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.2.0 >Reporter: Shawn Wang >Assignee: Shawn Wang >Priority: Blocker > Labels: new-consumer-threading-should-fix > Fix For: 3.3.0, 3.2.1 > > > Hi > In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue > that consumer#poll(duration) will be returned after the provided duration. > It's because if rebalance needed, we'll try to commit current offset first > before rebalance synchronously. And if the offset committing takes too long, > the consumer#poll will spend more time than provided duration. To fix that, > we change commit sync with commit async before rebalance (i.e. onPrepareJoin). > > However, in this ticket, we found the async commit will keep sending a new > commit request during each Consumer#poll, because the offset commit never > completes in time. The impact is that the existing consumer will be kicked > out of the group after rebalance timeout without joining the group. That is, > suppose we have consumer A in group G, and now consumer B joined the group, > after the rebalance, only consumer B in the group. > > The workaround for this issue is to change the assignor back to eager > assignors, ex: StickyAssignor, RoundRobinAssignor. > > To fix the issue, we come out 2 solutions: > # we can explicitly wait for the async commit complete in onPrepareJoin, but > that would let the KAFKA-13310 issue happen again. > # 2.we can try to keep the async commit offset future currently inflight. So > that we can make sure each Consumer#poll, we are waiting for the future > completes > > Besides, there's also another bug found during fixing this bug. Before > KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry > when retriable error until timeout. After KAFKA-13310, we thought we have > retry, but we'll retry after partitions revoking. That is, even though the > retried offset commit successfully, it still causes some partitions offsets > un-committed, and after rebalance, other consumers will consume overlapping > records. > > > === > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752] > > we didn't wait for client to receive commit offset response here, so > onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and > client will loop in invoking onJoinPrepare. > I think the EAGER mode don't have this problem because it will revoke the > partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try > to commit next round. > reproduce: > * single node Kafka version 3.2.0 && client version 3.2.0 > * topic1 have 5 partititons > * start a consumer1 (cooperative rebalance) > * start another consumer2 (same consumer group) > * consumer1 will hang for a long time before re-join > * from server log consumer1 rebalance timeout before joineGroup and re-join > with another memberId > consume1's log keeps printing: > 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer > clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation > 54 and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938 > (ConsumerCoordinator.java:739) > 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer > clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of > offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} > (ConsumerCoordinator.java:1143) > > and coordinator's log: > [2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance > group xxx in state PreparingRebalance with old generation 56 > (__consumer_offsets-30) (reason: Adding new member > consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id > None; client reason: rebalance failed due to 'The group member needs to have > a valid member id before actually entering a consumer group.' > (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator) > [2022-06-26 17:00:43,855] INFO [GroupCoordinator 0]: Group xxx removed > dynamic members who haven't joined: > Set(consumer-xxx-1-d62a0923-6ca6-48dd-a84e-f97136d4603a) > (kafka.coordinator.group.GroupCoordinator) > [2022-06-26 17:00:43,856] INFO [GroupCoordinator 0]: Stabilized group xxx > generation 57 (__consumer_offsets-30)
[jira] [Commented] (KAFKA-14024) Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare
[ https://issues.apache.org/jira/browse/KAFKA-14024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568783#comment-17568783 ] Luke Chen commented on KAFKA-14024: --- > In this way we would not need a separate timer inside the `onJoinPrepare` for > the commit itself. [~guozhang] , thanks for the suggestion. Yes, that looks simpler! I like it. But since release time approaching and there will be new rebalance protocol (KIP-848) coming soon, I'm going to merge it as is. But again, thanks for the comment. I learned something from it. Thanks. [~aiquestion] , thanks again for finding the issue and the PR! > Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare > -- > > Key: KAFKA-14024 > URL: https://issues.apache.org/jira/browse/KAFKA-14024 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.2.0 >Reporter: Shawn Wang >Assignee: Shawn Wang >Priority: Blocker > Labels: new-consumer-threading-should-fix > Fix For: 3.3.0, 3.2.1 > > > Hi > In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue > that consumer#poll(duration) will be returned after the provided duration. > It's because if rebalance needed, we'll try to commit current offset first > before rebalance synchronously. And if the offset committing takes too long, > the consumer#poll will spend more time than provided duration. To fix that, > we change commit sync with commit async before rebalance (i.e. onPrepareJoin). > > However, in this ticket, we found the async commit will keep sending a new > commit request during each Consumer#poll, because the offset commit never > completes in time. The impact is that the existing consumer will be kicked > out of the group after rebalance timeout without joining the group. That is, > suppose we have consumer A in group G, and now consumer B joined the group, > after the rebalance, only consumer B in the group. > > The workaround for this issue is to change the assignor back to eager > assignors, ex: StickyAssignor, RoundRobinAssignor. > > To fix the issue, we come out 2 solutions: > # we can explicitly wait for the async commit complete in onPrepareJoin, but > that would let the KAFKA-13310 issue happen again. > # 2.we can try to keep the async commit offset future currently inflight. So > that we can make sure each Consumer#poll, we are waiting for the future > completes > > Besides, there's also another bug found during fixing this bug. Before > KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry > when retriable error until timeout. After KAFKA-13310, we thought we have > retry, but we'll retry after partitions revoking. That is, even though the > retried offset commit successfully, it still causes some partitions offsets > un-committed, and after rebalance, other consumers will consume overlapping > records. > > > === > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752] > > we didn't wait for client to receive commit offset response here, so > onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and > client will loop in invoking onJoinPrepare. > I think the EAGER mode don't have this problem because it will revoke the > partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try > to commit next round. > reproduce: > * single node Kafka version 3.2.0 && client version 3.2.0 > * topic1 have 5 partititons > * start a consumer1 (cooperative rebalance) > * start another consumer2 (same consumer group) > * consumer1 will hang for a long time before re-join > * from server log consumer1 rebalance timeout before joineGroup and re-join > with another memberId > consume1's log keeps printing: > 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer > clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation > 54 and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938 > (ConsumerCoordinator.java:739) > 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer > clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of > offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} > (ConsumerCoordinator.java:1143) > > and coordinator's log: > [2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance > group xxx in state PreparingRebalance with old generation 56 > (__consumer_offsets-30) (reason: Adding new member > consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id > None; client reason: rebalance failed due to 'The group member needs to have > a
[GitHub] [kafka] showuon commented on pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance
showuon commented on PR #12349: URL: https://github.com/apache/kafka/pull/12349#issuecomment-1189724526 Backport to 3.3 and 3.2. cc @jsancio @mumrah -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance
showuon merged PR #12349: URL: https://github.com/apache/kafka/pull/12349 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance
showuon commented on PR #12349: URL: https://github.com/apache/kafka/pull/12349#issuecomment-1189713426 Failed tests are unrelated: ``` Build / JDK 8 and Scala 2.12 / kafka.server.DynamicBrokerReconfigurationTest.testKeyStoreAlter() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] artemlivshits commented on pull request #12365: KAFKA-14020: Performance regression in Producer
artemlivshits commented on PR #12365: URL: https://github.com/apache/kafka/pull/12365#issuecomment-1189712085 > Are the test failures related to the PR? Yes, just pushed the fix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #12365: KAFKA-14020: Performance regression in Producer
junrao commented on PR #12365: URL: https://github.com/apache/kafka/pull/12365#issuecomment-1189657885 @artemlivshits : Are the test failures related to the PR? @ijuma : Do you have any other comments? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.
badaiaqrandista commented on code in PR #12416: URL: https://github.com/apache/kafka/pull/12416#discussion_r925034851 ## clients/src/main/java/org/apache/kafka/common/network/Selector.java: ## @@ -652,6 +652,11 @@ void write(KafkaChannel channel) throws IOException { if (send != null) { this.completedSends.add(send); this.sensors.recordCompletedSend(nodeId, send.size(), currentTimeMs); + +// To fix KAFKA-13559, i.e. select(timeout) blocks for 300 ms when the socket has no data but the buffer +// has data. Only happens when using SSL. +if (channel.hasBytesBuffered()) +madeReadProgressLastPoll = true; Review Comment: PR updated.. so I'll resolve this comment.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.
badaiaqrandista commented on code in PR #12416: URL: https://github.com/apache/kafka/pull/12416#discussion_r925034643 ## clients/src/main/java/org/apache/kafka/common/network/Selector.java: ## @@ -652,6 +652,11 @@ void write(KafkaChannel channel) throws IOException { if (send != null) { this.completedSends.add(send); this.sensors.recordCompletedSend(nodeId, send.size(), currentTimeMs); + +// To fix KAFKA-13559, i.e. select(timeout) blocks for 300 ms when the socket has no data but the buffer +// has data. Only happens when using SSL. +if (channel.hasBytesBuffered()) +madeReadProgressLastPoll = true; Review Comment: @ijuma I have moved the fix to `pollSelectionKeys` as I mentioned before to make it less weird (hopefully). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.
badaiaqrandista commented on code in PR #12416: URL: https://github.com/apache/kafka/pull/12416#discussion_r925034172 ## core/src/test/scala/unit/kafka/network/SocketServerTest.scala: ## @@ -1878,6 +1878,97 @@ class SocketServerTest { }, false) } + /** + * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition: + * 1. Client-Server communication uses SSL socket. + * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer. + * + * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that + * leads to this situation is the following (from the server point of view): + * + * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer + * (SslTransportLayer.netReadBuffer). + * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer. + * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer. + * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from + * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in Selector.poll() + * should not block for 300 ms. + * + * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly + * into SslTransportLayer.netReadBuffer and manually trigger the processing. + * + */ + @Test + def testLatencyWithBufferedDataAndNoSocketData(): Unit = { +shutdownServerAndMetrics(server) + +// to ensure we only have 1 connection (channel) +val props = sslServerProps +val numConnections = 1 +props.put("max.connections.per.ip", numConnections.toString) + +// create server with SSL listener +val testableServer = new TestableSocketServer(KafkaConfig.fromProps(props)) +testableServer.enableRequestProcessing(Map.empty) +// dataPlaneAcceptors.get(endpoint).processors(0).selector.asInstanceOf[TestableSelector] Review Comment: done.. PR updated.. ## core/src/test/scala/unit/kafka/network/SocketServerTest.scala: ## @@ -1878,6 +1878,97 @@ class SocketServerTest { }, false) } + /** + * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition: + * 1. Client-Server communication uses SSL socket. + * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer. + * + * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that + * leads to this situation is the following (from the server point of view): + * + * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer + * (SslTransportLayer.netReadBuffer). + * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer. + * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer. + * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from + * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in Selector.poll() + * should not block for 300 ms. + * + * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly + * into SslTransportLayer.netReadBuffer and manually trigger the processing. + * + */ + @Test + def testLatencyWithBufferedDataAndNoSocketData(): Unit = { +shutdownServerAndMetrics(server) + +// to ensure we only have 1 connection (channel) +val props = sslServerProps +val numConnections = 1 +props.put("max.connections.per.ip", numConnections.toString) Review Comment: done.. PR updated.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kirktrue closed pull request #12327: WIP - DO NOT MERGE - KIP-714 #718
kirktrue closed pull request #12327: WIP - DO NOT MERGE - KIP-714 #718 URL: https://github.com/apache/kafka/pull/12327 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions
jnh5y commented on code in PR #12408: URL: https://github.com/apache/kafka/pull/12408#discussion_r924957846 ## streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java: ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.CloseOptions; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.junit.rules.Timeout; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import static java.util.Collections.singletonList; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup; + +@Category({IntegrationTest.class}) +public class KafkaStreamsCloseOptionsIntegrationTest { +@Rule +public Timeout globalTimeout = Timeout.seconds(600); +@Rule +public final TestName testName = new TestName(); +private static MockTime mockTime; + +@Rule +public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory()); + +protected static final String INPUT_TOPIC = "inputTopic"; +protected static final String OUTPUT_TOPIC = "outputTopic"; + +protected static final int STREAMS_CONSUMER_TIMEOUT = 2; Review Comment: @mjsax until we figure this out, we shouldn't merge this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions
jnh5y commented on code in PR #12408: URL: https://github.com/apache/kafka/pull/12408#discussion_r924957060 ## streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java: ## @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.CloseOptions; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.junit.rules.Timeout; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import static java.util.Collections.singletonList; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup; + +@Category({IntegrationTest.class}) +public class KafkaStreamsCloseOptionsIntegrationTest { +@Rule +public Timeout globalTimeout = Timeout.seconds(600); +@Rule +public final TestName testName = new TestName(); +private static MockTime mockTime; + +@Rule +public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory()); + +protected static final String INPUT_TOPIC = "inputTopic"; +protected static final String OUTPUT_TOPIC = "outputTopic"; + +protected static final int STREAMS_CONSUMER_TIMEOUT = 2; Review Comment: The test takes at least 20 seconds to run. This makes me wonder if the removal from the CG is working properly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14024) Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare
[ https://issues.apache.org/jira/browse/KAFKA-14024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568732#comment-17568732 ] Guozhang Wang commented on KAFKA-14024: --- Thanks to [~aiquestion] for filing this and also submitting the PR, I've added you as a contributor and assigned the ticket to you too. > Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare > -- > > Key: KAFKA-14024 > URL: https://issues.apache.org/jira/browse/KAFKA-14024 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.2.0 >Reporter: Shawn Wang >Assignee: Shawn Wang >Priority: Blocker > Labels: new-consumer-threading-should-fix > Fix For: 3.3.0, 3.2.1 > > > Hi > In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue > that consumer#poll(duration) will be returned after the provided duration. > It's because if rebalance needed, we'll try to commit current offset first > before rebalance synchronously. And if the offset committing takes too long, > the consumer#poll will spend more time than provided duration. To fix that, > we change commit sync with commit async before rebalance (i.e. onPrepareJoin). > > However, in this ticket, we found the async commit will keep sending a new > commit request during each Consumer#poll, because the offset commit never > completes in time. The impact is that the existing consumer will be kicked > out of the group after rebalance timeout without joining the group. That is, > suppose we have consumer A in group G, and now consumer B joined the group, > after the rebalance, only consumer B in the group. > > The workaround for this issue is to change the assignor back to eager > assignors, ex: StickyAssignor, RoundRobinAssignor. > > To fix the issue, we come out 2 solutions: > # we can explicitly wait for the async commit complete in onPrepareJoin, but > that would let the KAFKA-13310 issue happen again. > # 2.we can try to keep the async commit offset future currently inflight. So > that we can make sure each Consumer#poll, we are waiting for the future > completes > > Besides, there's also another bug found during fixing this bug. Before > KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry > when retriable error until timeout. After KAFKA-13310, we thought we have > retry, but we'll retry after partitions revoking. That is, even though the > retried offset commit successfully, it still causes some partitions offsets > un-committed, and after rebalance, other consumers will consume overlapping > records. > > > === > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752] > > we didn't wait for client to receive commit offset response here, so > onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and > client will loop in invoking onJoinPrepare. > I think the EAGER mode don't have this problem because it will revoke the > partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try > to commit next round. > reproduce: > * single node Kafka version 3.2.0 && client version 3.2.0 > * topic1 have 5 partititons > * start a consumer1 (cooperative rebalance) > * start another consumer2 (same consumer group) > * consumer1 will hang for a long time before re-join > * from server log consumer1 rebalance timeout before joineGroup and re-join > with another memberId > consume1's log keeps printing: > 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer > clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation > 54 and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938 > (ConsumerCoordinator.java:739) > 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer > clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of > offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} > (ConsumerCoordinator.java:1143) > > and coordinator's log: > [2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance > group xxx in state PreparingRebalance with old generation 56 > (__consumer_offsets-30) (reason: Adding new member > consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id > None; client reason: rebalance failed due to 'The group member needs to have > a valid member id before actually entering a consumer group.' > (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator) > [2022-06-26 17:00:43,855] INFO [GroupCoordinator 0]: Group xxx removed > dynamic members who haven't joined: > Set(consumer-xxx-1-d62a0923-6ca6-48dd-a84e-f97136d4603a)
[jira] [Updated] (KAFKA-14024) Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare
[ https://issues.apache.org/jira/browse/KAFKA-14024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-14024: -- Reviewer: Luke Chen > Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare > -- > > Key: KAFKA-14024 > URL: https://issues.apache.org/jira/browse/KAFKA-14024 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.2.0 >Reporter: Shawn Wang >Assignee: Shawn Wang >Priority: Blocker > Labels: new-consumer-threading-should-fix > Fix For: 3.3.0, 3.2.1 > > > Hi > In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue > that consumer#poll(duration) will be returned after the provided duration. > It's because if rebalance needed, we'll try to commit current offset first > before rebalance synchronously. And if the offset committing takes too long, > the consumer#poll will spend more time than provided duration. To fix that, > we change commit sync with commit async before rebalance (i.e. onPrepareJoin). > > However, in this ticket, we found the async commit will keep sending a new > commit request during each Consumer#poll, because the offset commit never > completes in time. The impact is that the existing consumer will be kicked > out of the group after rebalance timeout without joining the group. That is, > suppose we have consumer A in group G, and now consumer B joined the group, > after the rebalance, only consumer B in the group. > > The workaround for this issue is to change the assignor back to eager > assignors, ex: StickyAssignor, RoundRobinAssignor. > > To fix the issue, we come out 2 solutions: > # we can explicitly wait for the async commit complete in onPrepareJoin, but > that would let the KAFKA-13310 issue happen again. > # 2.we can try to keep the async commit offset future currently inflight. So > that we can make sure each Consumer#poll, we are waiting for the future > completes > > Besides, there's also another bug found during fixing this bug. Before > KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry > when retriable error until timeout. After KAFKA-13310, we thought we have > retry, but we'll retry after partitions revoking. That is, even though the > retried offset commit successfully, it still causes some partitions offsets > un-committed, and after rebalance, other consumers will consume overlapping > records. > > > === > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752] > > we didn't wait for client to receive commit offset response here, so > onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and > client will loop in invoking onJoinPrepare. > I think the EAGER mode don't have this problem because it will revoke the > partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try > to commit next round. > reproduce: > * single node Kafka version 3.2.0 && client version 3.2.0 > * topic1 have 5 partititons > * start a consumer1 (cooperative rebalance) > * start another consumer2 (same consumer group) > * consumer1 will hang for a long time before re-join > * from server log consumer1 rebalance timeout before joineGroup and re-join > with another memberId > consume1's log keeps printing: > 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer > clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation > 54 and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938 > (ConsumerCoordinator.java:739) > 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer > clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of > offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} > (ConsumerCoordinator.java:1143) > > and coordinator's log: > [2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance > group xxx in state PreparingRebalance with old generation 56 > (__consumer_offsets-30) (reason: Adding new member > consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id > None; client reason: rebalance failed due to 'The group member needs to have > a valid member id before actually entering a consumer group.' > (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator) > [2022-06-26 17:00:43,855] INFO [GroupCoordinator 0]: Group xxx removed > dynamic members who haven't joined: > Set(consumer-xxx-1-d62a0923-6ca6-48dd-a84e-f97136d4603a) > (kafka.coordinator.group.GroupCoordinator) > [2022-06-26 17:00:43,856] INFO [GroupCoordinator 0]: Stabilized group xxx > generation 57
[jira] [Assigned] (KAFKA-14024) Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare
[ https://issues.apache.org/jira/browse/KAFKA-14024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang reassigned KAFKA-14024: - Assignee: Guozhang Wang > Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare > -- > > Key: KAFKA-14024 > URL: https://issues.apache.org/jira/browse/KAFKA-14024 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.2.0 >Reporter: Shawn Wang >Assignee: Guozhang Wang >Priority: Blocker > Labels: new-consumer-threading-should-fix > Fix For: 3.3.0, 3.2.1 > > > Hi > In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue > that consumer#poll(duration) will be returned after the provided duration. > It's because if rebalance needed, we'll try to commit current offset first > before rebalance synchronously. And if the offset committing takes too long, > the consumer#poll will spend more time than provided duration. To fix that, > we change commit sync with commit async before rebalance (i.e. onPrepareJoin). > > However, in this ticket, we found the async commit will keep sending a new > commit request during each Consumer#poll, because the offset commit never > completes in time. The impact is that the existing consumer will be kicked > out of the group after rebalance timeout without joining the group. That is, > suppose we have consumer A in group G, and now consumer B joined the group, > after the rebalance, only consumer B in the group. > > The workaround for this issue is to change the assignor back to eager > assignors, ex: StickyAssignor, RoundRobinAssignor. > > To fix the issue, we come out 2 solutions: > # we can explicitly wait for the async commit complete in onPrepareJoin, but > that would let the KAFKA-13310 issue happen again. > # 2.we can try to keep the async commit offset future currently inflight. So > that we can make sure each Consumer#poll, we are waiting for the future > completes > > Besides, there's also another bug found during fixing this bug. Before > KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry > when retriable error until timeout. After KAFKA-13310, we thought we have > retry, but we'll retry after partitions revoking. That is, even though the > retried offset commit successfully, it still causes some partitions offsets > un-committed, and after rebalance, other consumers will consume overlapping > records. > > > === > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752] > > we didn't wait for client to receive commit offset response here, so > onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and > client will loop in invoking onJoinPrepare. > I think the EAGER mode don't have this problem because it will revoke the > partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try > to commit next round. > reproduce: > * single node Kafka version 3.2.0 && client version 3.2.0 > * topic1 have 5 partititons > * start a consumer1 (cooperative rebalance) > * start another consumer2 (same consumer group) > * consumer1 will hang for a long time before re-join > * from server log consumer1 rebalance timeout before joineGroup and re-join > with another memberId > consume1's log keeps printing: > 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer > clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation > 54 and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938 > (ConsumerCoordinator.java:739) > 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer > clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of > offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} > (ConsumerCoordinator.java:1143) > > and coordinator's log: > [2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance > group xxx in state PreparingRebalance with old generation 56 > (__consumer_offsets-30) (reason: Adding new member > consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id > None; client reason: rebalance failed due to 'The group member needs to have > a valid member id before actually entering a consumer group.' > (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator) > [2022-06-26 17:00:43,855] INFO [GroupCoordinator 0]: Group xxx removed > dynamic members who haven't joined: > Set(consumer-xxx-1-d62a0923-6ca6-48dd-a84e-f97136d4603a) > (kafka.coordinator.group.GroupCoordinator) > [2022-06-26 17:00:43,856] INFO [GroupCoordinator 0]: Stabilized group xxx > generation 57
[jira] [Assigned] (KAFKA-14024) Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare
[ https://issues.apache.org/jira/browse/KAFKA-14024?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang reassigned KAFKA-14024: - Assignee: Shawn Wang (was: Guozhang Wang) > Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare > -- > > Key: KAFKA-14024 > URL: https://issues.apache.org/jira/browse/KAFKA-14024 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.2.0 >Reporter: Shawn Wang >Assignee: Shawn Wang >Priority: Blocker > Labels: new-consumer-threading-should-fix > Fix For: 3.3.0, 3.2.1 > > > Hi > In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue > that consumer#poll(duration) will be returned after the provided duration. > It's because if rebalance needed, we'll try to commit current offset first > before rebalance synchronously. And if the offset committing takes too long, > the consumer#poll will spend more time than provided duration. To fix that, > we change commit sync with commit async before rebalance (i.e. onPrepareJoin). > > However, in this ticket, we found the async commit will keep sending a new > commit request during each Consumer#poll, because the offset commit never > completes in time. The impact is that the existing consumer will be kicked > out of the group after rebalance timeout without joining the group. That is, > suppose we have consumer A in group G, and now consumer B joined the group, > after the rebalance, only consumer B in the group. > > The workaround for this issue is to change the assignor back to eager > assignors, ex: StickyAssignor, RoundRobinAssignor. > > To fix the issue, we come out 2 solutions: > # we can explicitly wait for the async commit complete in onPrepareJoin, but > that would let the KAFKA-13310 issue happen again. > # 2.we can try to keep the async commit offset future currently inflight. So > that we can make sure each Consumer#poll, we are waiting for the future > completes > > Besides, there's also another bug found during fixing this bug. Before > KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry > when retriable error until timeout. After KAFKA-13310, we thought we have > retry, but we'll retry after partitions revoking. That is, even though the > retried offset commit successfully, it still causes some partitions offsets > un-committed, and after rebalance, other consumers will consume overlapping > records. > > > === > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752] > > we didn't wait for client to receive commit offset response here, so > onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and > client will loop in invoking onJoinPrepare. > I think the EAGER mode don't have this problem because it will revoke the > partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try > to commit next round. > reproduce: > * single node Kafka version 3.2.0 && client version 3.2.0 > * topic1 have 5 partititons > * start a consumer1 (cooperative rebalance) > * start another consumer2 (same consumer group) > * consumer1 will hang for a long time before re-join > * from server log consumer1 rebalance timeout before joineGroup and re-join > with another memberId > consume1's log keeps printing: > 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer > clientId=consumer-xx-1, groupId=xxx] Executing onJoinPrepare with generation > 54 and memberId consumer-xxx-1-fd3d04a8-009a-4ed1-949e-71b636716938 > (ConsumerCoordinator.java:739) > 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer > clientId=consumer-xxx-1, groupId=xxx] Sending asynchronous auto-commit of > offsets \{topic1-4=OffsetAndMetadata{offset=5, leaderEpoch=0, metadata=''}} > (ConsumerCoordinator.java:1143) > > and coordinator's log: > [2022-06-26 17:00:13,855] INFO [GroupCoordinator 0]: Preparing to rebalance > group xxx in state PreparingRebalance with old generation 56 > (__consumer_offsets-30) (reason: Adding new member > consumer-xxx-1-fa7fe5ec-bd2f-42f6-b5d7-c5caeafe71ac with group instance id > None; client reason: rebalance failed due to 'The group member needs to have > a valid member id before actually entering a consumer group.' > (MemberIdRequiredException)) (kafka.coordinator.group.GroupCoordinator) > [2022-06-26 17:00:43,855] INFO [GroupCoordinator 0]: Group xxx removed > dynamic members who haven't joined: > Set(consumer-xxx-1-d62a0923-6ca6-48dd-a84e-f97136d4603a) > (kafka.coordinator.group.GroupCoordinator) > [2022-06-26 17:00:43,856] INFO [GroupCoordinator 0]: Stabilized group xxx >
[jira] [Commented] (KAFKA-14024) Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare
[ https://issues.apache.org/jira/browse/KAFKA-14024?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568731#comment-17568731 ] Guozhang Wang commented on KAFKA-14024: --- Hello [~mumrah], I took a look at the ticket and also the PR (https://github.com/apache/kafka/pull/12349/files) as well, and I agree with [~showuon] that this is a pretty bad regression that we should consider fixing asap and hence worthy as a blocker for 3.2.1. As for the PR, personally I'd simplify it a bit than the current fix, to `onJoinPrepare` more re-entrant and idempotent: more specifically when the caller thread of `poll` enters `onJoinPrepare`, it will check if there's already a commit in-flight already and is completed, and if not send out the request and return from `onJoinPrepare` immediately, and hence return from the `poll` call as well; and the next `poll` call would re-enter `onJoinPrepare` and check if the commit request has completed; only if the maintained commit future has been completed then would it continue within the function to revoke partitions, trigger callbacks etc. In this way we would not need a separate timer inside the `onJoinPrepare` for the commit itself. But since [~showuon] is almost done reviewing it I think I would leave it to him, rather not block on merging it. In the new rebalance protocol (KIP-848) we would have a much simpler model on the client side so hopefully we would not fall in this awkward design pattern any more. > Consumer stuck during cooperative rebalance for Commit offset in onJoinPrepare > -- > > Key: KAFKA-14024 > URL: https://issues.apache.org/jira/browse/KAFKA-14024 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.2.0 >Reporter: Shawn Wang >Priority: Blocker > Labels: new-consumer-threading-should-fix > Fix For: 3.3.0, 3.2.1 > > > Hi > In https://issues.apache.org/jira/browse/KAFKA-13310. we tried to fix a issue > that consumer#poll(duration) will be returned after the provided duration. > It's because if rebalance needed, we'll try to commit current offset first > before rebalance synchronously. And if the offset committing takes too long, > the consumer#poll will spend more time than provided duration. To fix that, > we change commit sync with commit async before rebalance (i.e. onPrepareJoin). > > However, in this ticket, we found the async commit will keep sending a new > commit request during each Consumer#poll, because the offset commit never > completes in time. The impact is that the existing consumer will be kicked > out of the group after rebalance timeout without joining the group. That is, > suppose we have consumer A in group G, and now consumer B joined the group, > after the rebalance, only consumer B in the group. > > The workaround for this issue is to change the assignor back to eager > assignors, ex: StickyAssignor, RoundRobinAssignor. > > To fix the issue, we come out 2 solutions: > # we can explicitly wait for the async commit complete in onPrepareJoin, but > that would let the KAFKA-13310 issue happen again. > # 2.we can try to keep the async commit offset future currently inflight. So > that we can make sure each Consumer#poll, we are waiting for the future > completes > > Besides, there's also another bug found during fixing this bug. Before > KAFKA-13310, we commitOffset sync with rebalanceTimeout, which will retry > when retriable error until timeout. After KAFKA-13310, we thought we have > retry, but we'll retry after partitions revoking. That is, even though the > retried offset commit successfully, it still causes some partitions offsets > un-committed, and after rebalance, other consumers will consume overlapping > records. > > > === > [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L752] > > we didn't wait for client to receive commit offset response here, so > onJoinPrepareAsyncCommitCompleted will be false in cooperative rebalance, and > client will loop in invoking onJoinPrepare. > I think the EAGER mode don't have this problem because it will revoke the > partitions even if onJoinPrepareAsyncCommitCompleted=false and will not try > to commit next round. > reproduce: > * single node Kafka version 3.2.0 && client version 3.2.0 > * topic1 have 5 partititons > * start a consumer1 (cooperative rebalance) > * start another consumer2 (same consumer group) > * consumer1 will hang for a long time before re-join > * from server log consumer1 rebalance timeout before joineGroup and re-join > with another memberId > consume1's log keeps printing: > 16:59:16 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - [Consumer >
[GitHub] [kafka] jsancio commented on pull request #12274: KAFKA-13959: Controller should unfence Broker with busy metadata log
jsancio commented on PR #12274: URL: https://github.com/apache/kafka/pull/12274#issuecomment-1189538831 > @jsancio I think the last question is that there may be more than one registration record for each broker after restarting it, so can we rely on the broker epoch? I think I need some time to check the logic before make a final decision. Yes. I think you can rely on broker epoch and broker id. Also the active controller is guaranteed to have read all of the records on the log before handling RPCs like heartbeat. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #12324: KAFKA-12699: Override the default handler for stream threads if the stream's handler is used
ableegoldman commented on PR #12324: URL: https://github.com/apache/kafka/pull/12324#issuecomment-1189528697 Also cherrypicked back to the 3.3 branch still code freeze deadline is tomorrow -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #12324: KAFKA-12699: Override the default handler for stream threads if the stream's handler is used
ableegoldman commented on PR #12324: URL: https://github.com/apache/kafka/pull/12324#issuecomment-1189526095 Merged to trunk, thanks @wcarlson5 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #12324: KAFKA-12699: Override the default handler for stream threads if the stream's handler is used
ableegoldman merged PR #12324: URL: https://github.com/apache/kafka/pull/12324 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #12324: KAFKA-12699: override the default handler for stream threads if the stream's handl…
ableegoldman commented on PR #12324: URL: https://github.com/apache/kafka/pull/12324#issuecomment-1189524621 Test failures are unrelated, ARM build did time out but I've seen this happen on other PRs recently and the build page shows the actual test steps passing (error message in failed step is just ` [Checks API] No suitable checks publisher found`) so I believe it to be unrelated as well. Seems good to merge -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] badaiaqrandista commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.
badaiaqrandista commented on code in PR #12416: URL: https://github.com/apache/kafka/pull/12416#discussion_r924908020 ## core/src/test/scala/unit/kafka/network/SocketServerTest.scala: ## @@ -1878,6 +1878,97 @@ class SocketServerTest { }, false) } + /** + * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition: + * 1. Client-Server communication uses SSL socket. + * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer. + * + * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that + * leads to this situation is the following (from the server point of view): + * + * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer + * (SslTransportLayer.netReadBuffer). + * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer. + * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer. + * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from + * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in Selector.poll() + * should not block for 300 ms. + * + * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly + * into SslTransportLayer.netReadBuffer and manually trigger the processing. + * + */ + @Test + def testLatencyWithBufferedDataAndNoSocketData(): Unit = { +shutdownServerAndMetrics(server) + +// to ensure we only have 1 connection (channel) +val props = sslServerProps +val numConnections = 1 +props.put("max.connections.per.ip", numConnections.toString) + +// create server with SSL listener +val testableServer = new TestableSocketServer(KafkaConfig.fromProps(props)) +testableServer.enableRequestProcessing(Map.empty) +// dataPlaneAcceptors.get(endpoint).processors(0).selector.asInstanceOf[TestableSelector] +val testableSelector = testableServer.testableSelector +val proxyServer = new ProxyServer(testableServer) + +try { + // trigger SSL handshake by sending the first request and receiving its response without buffering + val requestBytes = producerRequestBytes() + val sslSocket = sslClientSocket(proxyServer.localPort) + + sendRequest(sslSocket, requestBytes) + val request1 = receiveRequest(testableServer.dataPlaneRequestChannel) + processRequest(testableServer.dataPlaneRequestChannel, request1) + receiveResponse(sslSocket) + + // then put 2 requests in SslTransportLayer.netReadBuffer via the ProxyServer + val connectionId = request1.context.connectionId + val listener = testableServer.config.dataPlaneListeners.head.listenerName.value + val channel = testableServer.dataPlaneAcceptor(listener).get.processors(0).channel(connectionId).getOrElse(throw new IllegalStateException("Channel not found")) + val transportLayer: SslTransportLayer = JTestUtils.fieldValue(channel, classOf[KafkaChannel], "transportLayer") + val netReadBuffer: ByteBuffer = JTestUtils.fieldValue(transportLayer, classOf[SslTransportLayer], "netReadBuffer") + + proxyServer.enableBuffering(netReadBuffer) + sendRequest(sslSocket, requestBytes) + sendRequest(sslSocket, requestBytes) + + val keysWithBufferedRead: util.Set[SelectionKey] = JTestUtils.fieldValue(testableSelector, classOf[Selector], "keysWithBufferedRead") + keysWithBufferedRead.add(channel.selectionKey) + JTestUtils.setFieldValue(transportLayer, "hasBytesBuffered", true) + + // process the first request in the server side + // this would move bytes from netReadBuffer to appReadBuffer, then process only the first request + processRequest(testableServer.dataPlaneRequestChannel) + + // receive response in the client side + receiveResponse(sslSocket) + + // process the second request in the server side + // this would process the second request in the appReadBuffer + // NOTE: this should not block because the data is already in the buffer, but without the fix for KAFKA-13559, + // this step will take more than 300 ms + val processTimeStart = System.currentTimeMillis() + processRequest(testableServer.dataPlaneRequestChannel) + val processTimeEnd = System.currentTimeMillis() + + // receive response in the client side + receiveResponse(sslSocket) Review Comment: @splett2 Can you refresh the PR? I have added timeout override and changed currentTimeMillis to nanoTime. So this comment is against an older version of the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To
[jira] [Assigned] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton reassigned KAFKA-12463: - Assignee: (was: Chris Egerton) > Update default consumer partition assignor for sink tasks > - > > Key: KAFKA-12463 > URL: https://issues.apache.org/jira/browse/KAFKA-12463 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Priority: Major > Labels: needs-kip > > Kafka consumers have a pluggable [partition assignment > interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] > that comes with several out-of-the-box implementations including the > [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], > > [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], > > [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], > and > [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. > If no partition assignor is configured with a consumer, the {{RangeAssignor}} > is used by default. Although there are some benefits to this assignor > including stability of assignment across generations and simplicity of > design, it comes with a major drawback: the number of active consumers in a > group is limited to the number of partitions in the topic(s) with the most > partitions. For an example of the worst case, in a consumer group where every > member is subscribed to ten topics that each have one partition, only one > member of that group will be assigned any topic partitions. > This can end up producing counterintuitive and even frustrating behavior when > a sink connector is brought up with N tasks to read from some collection of > topics with a total of N topic partitions, but some tasks end up idling and > not processing any data. > h3. Proposed Change > *NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below > will not work as consumers will still perform eager rebalancing as long as at > least one of the partition assignors they are configured with does not > support cooperative rebalancing. KAFKA-12487 should also be addressed before > configuring any Connect worker to use the {{CooperativeStickyAssignor}} for > any sink connectors.* > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > introduced the {{CooperativeStickyAssignor}}, which seeks to provide a > stable assignment across generations wherever possible, provide the most even > assignment possible (taking into account possible differences in > subscriptions across consumers in the group), and allow consumers to continue > processing data during rebalance. The documentation for the assignor states > that "Users should prefer this assignor for newer clusters." As Connect and > the tooling around it matures and automatic restarts of failed tasks become > more popular, care should be taken to ensure that the consumer group churn > created by restarting one or more tasks doesn't compromise the availability > of other tasks by forcing them to temporarily yield up all of their > partitions just to reclaim them after a rebalance has completed. > With that in mind, we should alter the default consumer configuration for > sink tasks to use the new {{CooperativeStickyAssignor}}. In order to do this > in a backwards-compatible fashion that also enables rolling upgrades, this > should be implemented by changing the {{Worker}} to set the following on the > consumer configuration created for each sink connector task: > {code:java} > partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor > {code} > This way, consumer groups for sink connectors on Connect clusters in the > process of being upgraded will continue to use the {{RangeAssignor}} until > all workers in the cluster have been upgraded, and then will switch over to > the new {{CooperativeStickyAssignor}} automatically. > Importantly, this setting will only be a default, and any user-specified > overrides either in the *worker config*: > > {code:java} > consumer.partition.assignment.strategy={code} > > or in the *connector config*: > > {code:java} > "consumer.override.partition.assignment.strategy": " strategy>"{code} > > will still be respected. > This improvement is viable as far back as -2.3- 2.4, when the > {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug > fix,
[jira] [Assigned] (KAFKA-12476) Worker can block for longer than scheduled rebalance delay and/or session key TTL
[ https://issues.apache.org/jira/browse/KAFKA-12476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton reassigned KAFKA-12476: - Assignee: (was: Chris Egerton) > Worker can block for longer than scheduled rebalance delay and/or session key > TTL > - > > Key: KAFKA-12476 > URL: https://issues.apache.org/jira/browse/KAFKA-12476 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.2, 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2, 3.0.0 >Reporter: Chris Egerton >Priority: Major > > Near the end of a distributed worker's herder tick loop, it calculates how > long it should poll for rebalance activity before beginning a new loop. See > [here|https://github.com/apache/kafka/blob/8da65936d7fc53d24c665c0d01893d25a430933b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L399-L409] > and > [here|https://github.com/apache/kafka/blob/8da65936d7fc53d24c665c0d01893d25a430933b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L459]. > In between then and when it begins polling for rebalancing activity, some > connector and task (re-)starts take place. While this normally completes in > at most a minute or two, an overloaded cluster or one in the midst of garbage > collection may take longer. See > [here|https://github.com/apache/kafka/blob/8da65936d7fc53d24c665c0d01893d25a430933b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L411-L452]. > The worker should calculate the time to poll for rebalance activity as > closely as possible to when it actually begins that polling. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-8299) Add type-safe instantiation of generic classes to AbstractConfig
[ https://issues.apache.org/jira/browse/KAFKA-8299?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton reassigned KAFKA-8299: Assignee: (was: Chris Egerton) > Add type-safe instantiation of generic classes to AbstractConfig > > > Key: KAFKA-8299 > URL: https://issues.apache.org/jira/browse/KAFKA-8299 > Project: Kafka > Issue Type: Improvement > Components: config >Reporter: Chris Egerton >Priority: Minor > > {{AbstractConfig.getConfiguredInstance(String key, Class klass)}} and > other similar methods isn't type-safe for generic types. For example, the > following code compiles but generates a runtime exception when the created > {{Consumer}} is invoked: > > {code:java} > public class KafkaIssueSnippet { > public static class PrintInt implements Consumer { > @Override > public void accept(Integer i) { > System.out.println(i); > } > } > public static void main(String[] args) { > final String stringConsumerProp = "string.consumer.class"; > AbstractConfig config = new AbstractConfig( > new ConfigDef().define( > stringConsumerProp, > ConfigDef.Type.CLASS, > ConfigDef.Importance.HIGH, > "A class that implements Consumer" > ), > Collections.singletonMap( > stringConsumerProp, > PrintInt.class.getName() > ) > ); > Consumer stringConsumer = config.getConfiguredInstance( > stringConsumerProp, > Consumer.class > ); > stringConsumer.accept("Oops! ClassCastException"); > } > }{code} > The compiler (rightfully so) generates a warning about the unchecked cast > from {{Consumer}} to {{Consumer}} to indicate that exactly this sort > of thing may happen, but it would be nice if we didn't have to worry about > this in the first place and instead had the same guarantees for generic types > that we do for non-generic types: that either the > {{getConfiguredInstance(...)}} method returns an object to us that we know > for sure is an instance of the requested type, or an exception is thrown. > Apache Commons contains a useful reflection library that could possibly be > used to bridge this gap; specifically, its > [TypeUtils|https://commons.apache.org/proper/commons-lang/apidocs/org/apache/commons/lang3/reflect/TypeUtils.html] > and > [TypeLiteral|https://commons.apache.org/proper/commons-lang/apidocs/org/apache/commons/lang3/reflect/TypeLiteral.html] > classes could be used to add new {{getConfiguredInstance}} and > {{getConfiguredInstances}} methods to the {{AbstractConfig}} class that > accept instances of {{TypeLiteral}} instead of {{Class}} and then perform > type checking to ensure that the requested class actually implements/extends > from the requested type. > Since this affects public API it's possible a KIP will be required, but the > changes are pretty lightweight (four new methods that heavily resemble > existing ones). If a contributor or committer, especially one familiar with > this section of the codebase, has an opinion on the necessity of a KIP their > input would be appreciated. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on a diff in pull request #12422: KAFKA-13982: Move WorkerConfigTransformerTest to use Mockito
C0urante commented on code in PR #12422: URL: https://github.com/apache/kafka/pull/12422#discussion_r924890270 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java: ## @@ -144,10 +149,6 @@ public void subscribe(String path, Set keys, ConfigChangeCallback callba throw new UnsupportedOperationException(); } -public void unsubscribe(String path, Set keys) { -throw new UnsupportedOperationException(); -} - Review Comment: Nice catch, took me a second to figure out the misalignment between the type signatures for this method and for `ConfigProvider::unsubscribe`. Should we also remove the `subscribe` method, since it matches the [default implementation](https://github.com/apache/kafka/blob/693e283802590b724ef441d5bf7acb6eeced91c5/clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java#L59-L61) for `ConfigProvider::subscribe`? Also, should we annotate any remaining methods with `@Override` to help prevent future mistakes like the one you've caught here? ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java: ## @@ -53,64 +50,72 @@ public class WorkerConfigTransformerTest { public static final String TEST_RESULT_WITH_TTL = "testResultWithTTL"; public static final String TEST_RESULT_WITH_LONGER_TTL = "testResultWithLongerTTL"; -@Mock private Herder herder; -@Mock private Worker worker; -@Mock private HerderRequest requestId; +private final Herder herder = Mockito.mock(Herder.class); +private final Worker worker = Mockito.mock(Worker.class); +private final HerderRequest requestId = Mockito.mock(HerderRequest.class); private WorkerConfigTransformer configTransformer; @Before public void setup() { -worker = PowerMock.createMock(Worker.class); -herder = PowerMock.createMock(Herder.class); configTransformer = new WorkerConfigTransformer(worker, Collections.singletonMap("test", new TestConfigProvider())); } @Test public void testReplaceVariable() { +// Execution Map result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKey}")); + +// Assertions assertEquals(TEST_RESULT, result.get(MY_KEY)); } @Test public void testReplaceVariableWithTTL() { -EasyMock.expect(worker.herder()).andReturn(herder); - -replayAll(); +// Setup +when(worker.herder()).thenReturn(herder); +// Execution Map props = new HashMap<>(); props.put(MY_KEY, "${test:testPath:testKeyWithTTL}"); props.put(CONFIG_RELOAD_ACTION_CONFIG, CONFIG_RELOAD_ACTION_NONE); Map result = configTransformer.transform(MY_CONNECTOR, props); + +// Assertions +assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY)); Review Comment: Good catch ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java: ## @@ -53,64 +50,72 @@ public class WorkerConfigTransformerTest { public static final String TEST_RESULT_WITH_TTL = "testResultWithTTL"; public static final String TEST_RESULT_WITH_LONGER_TTL = "testResultWithLongerTTL"; -@Mock private Herder herder; -@Mock private Worker worker; -@Mock private HerderRequest requestId; +private final Herder herder = Mockito.mock(Herder.class); +private final Worker worker = Mockito.mock(Worker.class); +private final HerderRequest requestId = Mockito.mock(HerderRequest.class); private WorkerConfigTransformer configTransformer; @Before public void setup() { -worker = PowerMock.createMock(Worker.class); -herder = PowerMock.createMock(Herder.class); configTransformer = new WorkerConfigTransformer(worker, Collections.singletonMap("test", new TestConfigProvider())); } @Test public void testReplaceVariable() { +// Execution Map result = configTransformer.transform(MY_CONNECTOR, Collections.singletonMap(MY_KEY, "${test:testPath:testKey}")); + +// Assertions assertEquals(TEST_RESULT, result.get(MY_KEY)); } @Test public void testReplaceVariableWithTTL() { -EasyMock.expect(worker.herder()).andReturn(herder); - -replayAll(); +// Setup +when(worker.herder()).thenReturn(herder); +// Execution Map props = new HashMap<>(); props.put(MY_KEY, "${test:testPath:testKeyWithTTL}"); props.put(CONFIG_RELOAD_ACTION_CONFIG, CONFIG_RELOAD_ACTION_NONE); Map result = configTransformer.transform(MY_CONNECTOR, props); + +// Assertions +assertEquals(TEST_RESULT_WITH_TTL, result.get(MY_KEY)); } @Test public void
[GitHub] [kafka] C0urante commented on pull request #11783: KAFKA-10000: System tests (KIP-618)
C0urante commented on PR #11783: URL: https://github.com/apache/kafka/pull/11783#issuecomment-1189451124 I made a small adjustment to the shutdown logic for the tests, which ensures that the worker process is actually dead before trying to bring up a new one. This should not affect the accuracy of the tests, but it did solve the above-mentioned issue with workers failing to restart because port 8083 on their container was still in use. I also corrected a small rebase error caused by the recent changes to remove the file connectors from the default Connect setup, a test failure caused by the new source connector properties, and a minor bug in the `VerifiableSourceTask` class that can cause a `NullPointerException` during shutdown. With these changes, I was able to get a complete green run of the set of tests under the `tests/kafkatest/tests/connect` directory. I've also kicked off a local run of the `test_exactly_once_source` test with unclean shutdown and the `sessioned` protocol; going to try to let that go for the rest of the day on repeat. Will report any non-spurious failures. @showuon if you have time, would you mind giving a local run of these tests another try? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions
mjsax commented on code in PR #12408: URL: https://github.com/apache/kafka/pull/12408#discussion_r924838158 ## streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java: ## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.CloseOptions; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.junit.rules.Timeout; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup; + +@Category({IntegrationTest.class}) +public class KafkaStreamsCloseOptionsIntegrationTest { //extends AbstractResetIntegrationTest { +@Rule +public Timeout globalTimeout = Timeout.seconds(600); +@Rule +public final TestName testName = new TestName(); +private static MockTime mockTime; + +@Rule +public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory()); + +protected static final String INPUT_TOPIC = "inputTopic"; +protected static final String OUTPUT_TOPIC = "outputTopic"; +private static final String OUTPUT_TOPIC_2 = "outputTopic2"; +private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun"; + +protected static final int STREAMS_CONSUMER_TIMEOUT = 2000; +protected static final int TIMEOUT_MULTIPLIER = 15; + +protected Properties streamsConfig; +protected static KafkaStreams streams; +protected static Admin adminClient; +protected Properties commonClientConfig; +private Properties producerConfig; +protected Properties resultConsumerConfig; + +public static final EmbeddedKafkaCluster CLUSTER; + +static { +final Properties brokerProps = new Properties(); +CLUSTER = new EmbeddedKafkaCluster(1, brokerProps); +} + +@BeforeClass +public static void startCluster() throws IOException { +CLUSTER.start(); +} + +@AfterClass +public static void closeCluster() { +CLUSTER.stop(); +} + +@Before +public void before() throws Exception { +mockTime = CLUSTER.time; + +final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName); + +commonClientConfig = new Properties(); +commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + +streamsConfig = new Properties(); +streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath()); +streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); +streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
[GitHub] [kafka] junrao commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer
junrao commented on code in PR #12365: URL: https://github.com/apache/kafka/pull/12365#discussion_r924818200 ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -1465,13 +1465,21 @@ public boolean isDone() { private class AppendCallbacks implements RecordAccumulator.AppendCallbacks { private final Callback userCallback; private final ProducerInterceptors interceptors; -private final ProducerRecord record; -protected int partition = RecordMetadata.UNKNOWN_PARTITION; +private final String topic; +private final Integer recordPartition; +private final String recordLogString; +private volatile int partition = RecordMetadata.UNKNOWN_PARTITION; +private volatile TopicPartition topicPartition; private AppendCallbacks(Callback userCallback, ProducerInterceptors interceptors, ProducerRecord record) { this.userCallback = userCallback; this.interceptors = interceptors; -this.record = record; +// Extract record info as we don't want to keep a reference to the record during +// whole lifetime of the batch. Review Comment: Thanks for the explanation. Sounds good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ashmeet13 commented on pull request #12414: KAFKA-14073 Logging the reason for Snapshot
ashmeet13 commented on PR #12414: URL: https://github.com/apache/kafka/pull/12414#issuecomment-1189398064 @dengziming I have made the changes requested. I have also made the changes in `QuorumController.maybeGenerateSnapshot` to log the reason for the snapshot being generated. `maybeGenerateSnapshot` seemed to be a better place than `QuorumController.SnapshotGeneratorManager.createSnapshotGenerator` since it already had logs for starting a snapshot. Also, it seems like `QuorumController` only creates a snapshot when max bytes are exceeded, I couldn't find it calling the snapshot generator for the reason metadata version changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ashmeet13 commented on a diff in pull request #12414: KAFKA-14073 Logging the reason for Snapshot
ashmeet13 commented on code in PR #12414: URL: https://github.com/apache/kafka/pull/12414#discussion_r924807677 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ## @@ -89,6 +89,12 @@ class BrokerMetadataListener( */ private var _bytesSinceLastSnapshot: Long = 0L + /** + * The reason as to why we are calling maybeStartSnapshot, can be either + * MaxBytesExceeded or MetadataVersionChanged + */ + private var _reasonForSnapshot: String = "" Review Comment: Thanks, I have made the change. I have handled it by returning a `Tuple` from the `shouldSnapshot` function. I am new to Scala, if there is another recommended way of doing it please let me know! ## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ## @@ -128,7 +134,15 @@ class BrokerMetadataListener( } private def shouldSnapshot(): Boolean = { -(_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots) || metadataVersionChanged() +if (_bytesSinceLastSnapshot >= maxBytesBetweenSnapshots) { + _reasonForSnapshot = "MaxBytesExceeded" Review Comment: Got it, made this change -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #12391: [DO NOT MERGE] KAFKA-10199: Add task updater metrics
guozhangwang commented on PR #12391: URL: https://github.com/apache/kafka/pull/12391#issuecomment-1189383814 @cadonna please lmk wdyt about the proposed metric changes in the descriptions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] artemlivshits commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer
artemlivshits commented on code in PR #12365: URL: https://github.com/apache/kafka/pull/12365#discussion_r924801253 ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -1465,13 +1465,21 @@ public boolean isDone() { private class AppendCallbacks implements RecordAccumulator.AppendCallbacks { private final Callback userCallback; private final ProducerInterceptors interceptors; -private final ProducerRecord record; -protected int partition = RecordMetadata.UNKNOWN_PARTITION; +private final String topic; +private final Integer recordPartition; +private final String recordLogString; +private volatile int partition = RecordMetadata.UNKNOWN_PARTITION; +private volatile TopicPartition topicPartition; private AppendCallbacks(Callback userCallback, ProducerInterceptors interceptors, ProducerRecord record) { this.userCallback = userCallback; this.interceptors = interceptors; -this.record = record; +// Extract record info as we don't want to keep a reference to the record during +// whole lifetime of the batch. Review Comment: I think it applies to all 3 fields: topic, recordPartition and recordLogString - we extract all this info from the record, so the comment is before we do that (in the PR it's kind of hard to see because of the inline discussion). Let me know if you think otherwise. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #12421: Revert "KAFKA-12887 Skip some RuntimeExceptions from exception handle…
mjsax commented on PR #12421: URL: https://github.com/apache/kafka/pull/12421#issuecomment-1189369026 Thanks for the PR. Merged to `trunk` and cherry-picked to `3.3`, `3.2` branches. (Did not cherry-pick cleanly to `3.1` -- can you do a follow up PR to backport it?) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12887) Do not trigger user-customized ExceptionalHandler for RTE
[ https://issues.apache.org/jira/browse/KAFKA-12887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568659#comment-17568659 ] Matthias J. Sax commented on KAFKA-12887: - This feature broke some stuff, and we revert it: [https://github.com/apache/kafka/pull/12421] Reverted in 3.3.0, 3.2.1. > Do not trigger user-customized ExceptionalHandler for RTE > - > > Key: KAFKA-12887 > URL: https://issues.apache.org/jira/browse/KAFKA-12887 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Josep Prat >Priority: Major > Fix For: 3.1.0 > > > Today in StreamThread we have a try-catch block that captures all {{Throwable > e}} and then triggers {{this.streamsUncaughtExceptionHandler.accept(e)}}. > However, there are possible RTEs such as IllegalState/IllegalArgument > exceptions which are usually caused by bugs, etc. In such cases we should not > let users to decide what to do with these exceptions, but should let Streams > itself to enforce the decision, e.g. in the IllegalState/IllegalArgument we > should fail fast to notify the potential error. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jnh5y commented on a diff in pull request #12408: KAFKA-14076 Fix issues with KafkaStreams.CloseOptions
jnh5y commented on code in PR #12408: URL: https://github.com/apache/kafka/pull/12408#discussion_r924785404 ## streams/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsCloseOptionsIntegrationTest.java: ## @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.CloseOptions; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.junit.rules.Timeout; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Properties; + +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForEmptyConsumerGroup; + +@Category({IntegrationTest.class}) +public class KafkaStreamsCloseOptionsIntegrationTest { //extends AbstractResetIntegrationTest { +@Rule +public Timeout globalTimeout = Timeout.seconds(600); +@Rule +public final TestName testName = new TestName(); +private static MockTime mockTime; + +@Rule +public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory()); + +protected static final String INPUT_TOPIC = "inputTopic"; +protected static final String OUTPUT_TOPIC = "outputTopic"; +private static final String OUTPUT_TOPIC_2 = "outputTopic2"; +private static final String OUTPUT_TOPIC_2_RERUN = "outputTopic2_rerun"; + +protected static final int STREAMS_CONSUMER_TIMEOUT = 2000; +protected static final int TIMEOUT_MULTIPLIER = 15; + +protected Properties streamsConfig; +protected static KafkaStreams streams; +protected static Admin adminClient; +protected Properties commonClientConfig; +private Properties producerConfig; +protected Properties resultConsumerConfig; + +public static final EmbeddedKafkaCluster CLUSTER; + +static { +final Properties brokerProps = new Properties(); +CLUSTER = new EmbeddedKafkaCluster(1, brokerProps); +} + +@BeforeClass +public static void startCluster() throws IOException { +CLUSTER.start(); +} + +@AfterClass +public static void closeCluster() { +CLUSTER.stop(); +} + +@Before +public void before() throws Exception { +mockTime = CLUSTER.time; + +final String appID = IntegrationTestUtils.safeUniqueTestName(getClass(), testName); + +commonClientConfig = new Properties(); +commonClientConfig.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + +streamsConfig = new Properties(); +streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath()); +streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); +streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
[GitHub] [kafka] junrao commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer
junrao commented on code in PR #12365: URL: https://github.com/apache/kafka/pull/12365#discussion_r924779033 ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -1465,13 +1465,21 @@ public boolean isDone() { private class AppendCallbacks implements RecordAccumulator.AppendCallbacks { private final Callback userCallback; private final ProducerInterceptors interceptors; -private final ProducerRecord record; -protected int partition = RecordMetadata.UNKNOWN_PARTITION; +private final String topic; +private final Integer recordPartition; +private final String recordLogString; +private volatile int partition = RecordMetadata.UNKNOWN_PARTITION; +private volatile TopicPartition topicPartition; private AppendCallbacks(Callback userCallback, ProducerInterceptors interceptors, ProducerRecord record) { this.userCallback = userCallback; this.interceptors = interceptors; -this.record = record; +// Extract record info as we don't want to keep a reference to the record during +// whole lifetime of the batch. Review Comment: Could we move these two lines to the immediate line before where we set recordPartition? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #12365: KAFKA-14020: Performance regression in Producer
jsancio commented on PR #12365: URL: https://github.com/apache/kafka/pull/12365#issuecomment-1189361061 > @jsancio : We plan to cherry-pick this to 3.3 branch since this fixes a performance issue in [KAFKA-10888](https://issues.apache.org/jira/browse/KAFKA-10888). Sounds good @junrao . I set the fix version for KAFKA-14020 to 3.3.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14020) Performance regression in Producer
[ https://issues.apache.org/jira/browse/KAFKA-14020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio updated KAFKA-14020: --- Fix Version/s: 3.3.0 > Performance regression in Producer > -- > > Key: KAFKA-14020 > URL: https://issues.apache.org/jira/browse/KAFKA-14020 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 3.3.0 >Reporter: John Roesler >Assignee: Artem Livshits >Priority: Blocker > Fix For: 3.3.0 > > > [https://github.com/apache/kafka/commit/f7db6031b84a136ad0e257df722b20faa7c37b8a] > introduced a 10% performance regression in the KafkaProducer under a default > config. > > The context for this result is a benchmark that we run for Kafka Streams. The > benchmark provisions 5 independent AWS clusters, including one broker node on > an i3.large and one client node on an i3.large. During a benchmark run, we > first run the Producer for 10 minutes to generate test data, and then we run > Kafka Streams under a number of configurations to measure its performance. > Our observation was a 10% regression in throughput under the simplest > configuration, in which Streams simply consumes from a topic and does nothing > else. That benchmark actually runs faster than the producer that generates > the test data, so its thoughput is bounded by the data generator's > throughput. After investigation, we realized that the regression was in the > data generator, not the consumer or Streams. > We have numerous benchmark runs leading up to the commit in question, and > they all show a throughput in the neighborhood of 115,000 records per second. > We also have 40 runs including and after that commit, and they all show a > throughput in the neighborhood of 105,000 records per second. A test on > [trunk with the commit reverted |https://github.com/apache/kafka/pull/12342] > shows a return to around 115,000 records per second. > Config: > {code:java} > final Properties properties = new Properties(); > properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker); > properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, > StringSerializer.class); > properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, > StringSerializer.class); > {code} > Here's the producer code in the data generator. Our tests were running with > three produceThreads. > {code:java} > for (int t = 0; t < produceThreads; t++) { > futures.add(executorService.submit(() -> { > int threadTotal = 0; > long lastPrint = start; > final long printInterval = Duration.ofSeconds(10).toMillis(); > long now; > try (final org.apache.kafka.clients.producer.Producer > producer = new KafkaProducer<>(producerConfig(broker))) { > while (limit > (now = System.currentTimeMillis()) - start) { > for (int i = 0; i < 1000; i++) { > final String key = keys.next(); > final String data = dataGen.generate(); > producer.send(new ProducerRecord<>(topic, key, > valueBuilder.apply(key, data))); > threadTotal++; > } > if ((now - lastPrint) > printInterval) { > System.out.println(Thread.currentThread().getName() + " > produced " + numberFormat.format(threadTotal) + " to " + topic + " in " + > Duration.ofMillis(now - start)); > lastPrint = now; > } > } > } > total.addAndGet(threadTotal); > System.out.println(Thread.currentThread().getName() + " finished (" + > numberFormat.format(threadTotal) + ") in " + Duration.ofMillis(now - start)); > })); > }{code} > As you can see, this is a very basic usage. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14087) Add jmh benchmark for producer with MockClient
[ https://issues.apache.org/jira/browse/KAFKA-14087?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Artem Livshits updated KAFKA-14087: --- Description: Something like this {code:java} Map configs = new HashMap<>(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none"); Time time = Time.SYSTEM; AtomicInteger offset = new AtomicInteger(0); MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 2)); ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE); StringBuilder value = new StringBuilder("foo"); for (int i = 0; i < 1000; i++) value.append("x"); AtomicInteger totalRecords = new AtomicInteger(0); long start = time.milliseconds(); CompletableFuture[] futures = new CompletableFuture[3]; for (int i = 0; i < futures.length; i++) { futures[i] = CompletableFuture.runAsync(() -> { ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); MockClient client = new MockClient(time, metadata) { @Override public void send(ClientRequest request, long now) { super.send(request, now); if (request.apiKey() == ApiKeys.PRODUCE) { // Prepare response data from request. ProduceResponseData responseData = new ProduceResponseData(); ProduceRequest produceRequest = (ProduceRequest) request.requestBuilder().build(); produceRequest.data().topicData().forEach(topicData -> topicData.partitionData().forEach(partitionData -> { String topic = topicData.name(); ProduceResponseData.TopicProduceResponse tpr = responseData.responses().find(topic); if (tpr == null) { tpr = new ProduceResponseData.TopicProduceResponse().setName(topic); responseData.responses().add(tpr); } tpr.partitionResponses().add(new ProduceResponseData.PartitionProduceResponse() .setIndex(partitionData.index()) .setRecordErrors(Collections.emptyList()) .setBaseOffset(offset.addAndGet(1)) .setLogAppendTimeMs(time.milliseconds()) .setLogStartOffset(0) .setErrorMessage("") .setErrorCode(Errors.NONE.code())); })); // Schedule a reply to come after some time to mock broker latency. executorService.schedule(() -> respond(new ProduceResponse(responseData)), 20, TimeUnit.MILLISECONDS); } } }; client.updateMetadata(initialUpdateResponse); InitProducerIdResponseData responseData = new InitProducerIdResponseData() .setErrorCode(Errors.NONE.code()) .setProducerEpoch((short) 0) .setProducerId(42) .setThrottleTimeMs(0); client.prepareResponse(body -> body instanceof InitProducerIdRequest, new InitProducerIdResponse(responseData), false); try (KafkaProducer producer = kafkaProducer( configs, new StringSerializer(), new StringSerializer(), metadata, client, null, time )) { final int records = 20_000_000; for (int k = 0; k < records; k++) { producer.send(new ProducerRecord<>("topic", null, start, "key-" + k, value.toString())); } totalRecords.addAndGet(records); } }); } for (CompletableFuture future : futures) { future.get(); } {code} was: Something like this {code:java} Map configs = new HashMap<>(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none"); Time
[jira] [Created] (KAFKA-14087) Add jmh benchmark for producer with MockClient
Artem Livshits created KAFKA-14087: -- Summary: Add jmh benchmark for producer with MockClient Key: KAFKA-14087 URL: https://issues.apache.org/jira/browse/KAFKA-14087 Project: Kafka Issue Type: Improvement Components: producer Reporter: Artem Livshits Something like this {code:java} Map configs = new HashMap<>(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000"); configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none"); Time time = Time.SYSTEM; AtomicInteger offset = new AtomicInteger(0); MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 2)); ProducerMetadata metadata = newMetadata(0, Long.MAX_VALUE); StringBuilder value = new StringBuilder("foo"); for (int i = 0; i < 1000; i++) value.append("x"); AtomicInteger totalRecords = new AtomicInteger(0); long start = time.milliseconds(); CompletableFuture[] futures = new CompletableFuture[3]; for (int i = 0; i < futures.length; i++) { futures[i] = CompletableFuture.runAsync(() -> { ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); MockClient client = new MockClient(time, metadata) { @Override public void send(ClientRequest request, long now) { super.send(request, now); if (request.apiKey() == ApiKeys.PRODUCE) { // Prepare response data from request. ProduceResponseData responseData = new ProduceResponseData(); ProduceRequest produceRequest = (ProduceRequest) request.requestBuilder().build(); produceRequest.data().topicData().forEach(topicData -> topicData.partitionData().forEach(partitionData -> { String topic = topicData.name(); ProduceResponseData.TopicProduceResponse tpr = responseData.responses().find(topic); if (tpr == null) { tpr = new ProduceResponseData.TopicProduceResponse().setName(topic); responseData.responses().add(tpr); } tpr.partitionResponses().add(new ProduceResponseData.PartitionProduceResponse() .setIndex(partitionData.index()) .setRecordErrors(Collections.emptyList()) .setBaseOffset(offset.addAndGet(1)) .setLogAppendTimeMs(time.milliseconds()) .setLogStartOffset(0) .setErrorMessage("") .setErrorCode(Errors.NONE.code())); })); // Schedule a reply to come after some time to mock broker latency. executorService.schedule(() -> respond(new ProduceResponse(responseData)), 20, TimeUnit.MILLISECONDS); } } }; client.updateMetadata(initialUpdateResponse); InitProducerIdResponseData responseData = new InitProducerIdResponseData() .setErrorCode(Errors.NONE.code()) .setProducerEpoch((short) 0) .setProducerId(42) .setThrottleTimeMs(0); client.prepareResponse(body -> body instanceof InitProducerIdRequest, new InitProducerIdResponse(responseData), false); try (KafkaProducer producer = kafkaProducer( configs, new StringSerializer(), new StringSerializer(), metadata, client, null, time )) { final int records = 20_000_000; for (int k = 0; k < records; k++) { producer.send(new ProducerRecord<>("topic", null, start, "key-" + k, value.toString())); } totalRecords.addAndGet(records); } }); } for (CompletableFuture future : futures) { future.get(); } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] artemlivshits commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer
artemlivshits commented on code in PR #12365: URL: https://github.com/apache/kafka/pull/12365#discussion_r924715457 ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -1465,13 +1465,17 @@ public boolean isDone() { private class AppendCallbacks implements RecordAccumulator.AppendCallbacks { private final Callback userCallback; private final ProducerInterceptors interceptors; -private final ProducerRecord record; +private ProducerRecord record; +private final String topic; protected int partition = RecordMetadata.UNKNOWN_PARTITION; private AppendCallbacks(Callback userCallback, ProducerInterceptors interceptors, ProducerRecord record) { this.userCallback = userCallback; this.interceptors = interceptors; this.record = record; +// Note a record would be null only if the client application has a bug, but we don't want to +// have NPE here, because the interceptors would not be notified (see .doSend). +topic = record != null ? record.topic() : null; Review Comment: It checks that the exception is thrown and then it checks that interceptors are called. Probably the test is just sloppy and could use a different error condition. KAFKA-14086 ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -1491,20 +1495,25 @@ public void setPartition(int partition) { if (log.isTraceEnabled()) { // Log the message here, because we don't know the partition before that. -log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition); +log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, topic, partition); } + +// Reset record to null here so that it doesn't have to be alive as long as the batch is. +record = null; Review Comment: Updated to extract all record info in the constructor. ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -1491,20 +1495,25 @@ public void setPartition(int partition) { if (log.isTraceEnabled()) { Review Comment: KAFKA-14085 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14086) Cleanup PlaintextConsumerTest.testInterceptors to not pass null record
Artem Livshits created KAFKA-14086: -- Summary: Cleanup PlaintextConsumerTest.testInterceptors to not pass null record Key: KAFKA-14086 URL: https://issues.apache.org/jira/browse/KAFKA-14086 Project: Kafka Issue Type: Task Reporter: Artem Livshits See https://github.com/apache/kafka/pull/12365/files#r919746298 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14085) Clean up usage of asserts in KafkaProducer
Artem Livshits created KAFKA-14085: -- Summary: Clean up usage of asserts in KafkaProducer Key: KAFKA-14085 URL: https://issues.apache.org/jira/browse/KAFKA-14085 Project: Kafka Issue Type: Task Components: producer Reporter: Artem Livshits See https://github.com/apache/kafka/pull/12365/files#r919749970 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] guozhangwang commented on pull request #12387: KAFKA-10199: Add RESUME in state updater
guozhangwang commented on PR #12387: URL: https://github.com/apache/kafka/pull/12387#issuecomment-1189321338 Thanks @cadonna , I've incorporated your comments and merged to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #12387: KAFKA-10199: Add RESUME in state updater
guozhangwang merged PR #12387: URL: https://github.com/apache/kafka/pull/12387 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12387: KAFKA-10199: Add RESUME in state updater
guozhangwang commented on code in PR #12387: URL: https://github.com/apache/kafka/pull/12387#discussion_r924732715 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java: ## @@ -723,6 +723,106 @@ private void shouldNotPauseTaskInRemovedTasks(final Task task) throws Exception verifyPausedTasks(); } +@Test +public void shouldResumeActiveStatefulTask() throws Exception { +final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); +shouldResumeStatefulTask(task); +verify(changelogReader, times(2)).enforceRestoreActive(); +} + +@Test +public void shouldResumeStandbyTask() throws Exception { +final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); +shouldResumeStatefulTask(task); +verify(changelogReader, times(2)).transitToUpdateStandby(); +} + +private void shouldResumeStatefulTask(final Task task) throws Exception { +stateUpdater.start(); +stateUpdater.add(task); + +stateUpdater.pause(task.id()); + +verifyPausedTasks(task); +verifyUpdatingTasks(); + +stateUpdater.resume(task.id()); + +verifyPausedTasks(); +verifyUpdatingTasks(task); +} + +@Test +public void shouldIgnoreResumingNotPausedTasks() throws Exception { Review Comment: ack. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12387: KAFKA-10199: Add RESUME in state updater
guozhangwang commented on code in PR #12387: URL: https://github.com/apache/kafka/pull/12387#discussion_r924731119 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java: ## @@ -428,6 +456,113 @@ private void shouldRemoveStatefulTask(final Task task) throws Exception { verify(changelogReader).unregister(task.changelogPartitions()); } +@Test +public void shouldPauseActiveStatefulTask() throws Exception { +final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); +shouldPauseStatefulTask(task); +verify(changelogReader, never()).transitToUpdateStandby(); +} + +@Test +public void shouldPauseStandbyTask() throws Exception { +final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); +shouldPauseStatefulTask(task); +verify(changelogReader, times(1)).transitToUpdateStandby(); +} + +@Test +public void shouldPauseActiveTaskAndTransitToUpdateStandby() throws Exception { +final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); +final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_B_0)); + +stateUpdater.start(); +stateUpdater.add(task1); +stateUpdater.add(task2); + +stateUpdater.pause(task1.id()); + +verifyPausedTasks(task1); +verifyCheckpointTasks(true, task1); +verifyRestoredActiveTasks(); +verifyRemovedTasks(); +verifyUpdatingTasks(task2); +verifyExceptionsAndFailedTasks(); +verify(changelogReader, times(1)).enforceRestoreActive(); +verify(changelogReader, times(1)).transitToUpdateStandby(); +} + +private void shouldPauseStatefulTask(final Task task) throws Exception { +stateUpdater.start(); +stateUpdater.add(task); + +stateUpdater.pause(task.id()); + +verifyPausedTasks(task); +verifyCheckpointTasks(true, task); +verifyRestoredActiveTasks(); +verifyRemovedTasks(); +verifyUpdatingTasks(); +verifyExceptionsAndFailedTasks(); +} + +@Test +public void shouldResumeActiveStatefulTask() throws Exception { +final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); +shouldResumeStatefulTask(task); +verify(changelogReader, times(2)).enforceRestoreActive(); +} + +@Test +public void shouldResumeStandbyTask() throws Exception { +final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); +shouldResumeStatefulTask(task); +verify(changelogReader, times(2)).transitToUpdateStandby(); +} + +private void shouldResumeStatefulTask(final Task task) throws Exception { +stateUpdater.start(); +stateUpdater.add(task); + +stateUpdater.pause(task.id()); + +verifyPausedTasks(task); +verifyUpdatingTasks(); + +stateUpdater.resume(task.id()); + +verifyPausedTasks(); +verifyUpdatingTasks(task); +} + +@Test +public void shouldRemovePausedTask() throws Exception { +final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); +stateUpdater.start(); +stateUpdater.add(task); + +stateUpdater.pause(task.id()); + +verifyPausedTasks(task); +verifyCheckpointTasks(true, task); +verifyRestoredActiveTasks(); +verifyRemovedTasks(); +verifyUpdatingTasks(); + +stateUpdater.remove(task.id()); + +verifyPausedTasks(); +verifyRemovedTasks(task); +verifyRestoredActiveTasks(); +verifyUpdatingTasks(); + +stateUpdater.resume(task.id()); + +verifyPausedTasks(); +verifyRemovedTasks(task); +verifyRestoredActiveTasks(); +verifyUpdatingTasks(); +} Review Comment: Sounds good, ack. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #12274: KAFKA-13959: Controller should unfence Broker with busy metadata log
dengziming commented on PR #12274: URL: https://github.com/apache/kafka/pull/12274#issuecomment-1189293753 @jsancio I think the last question is that there may be more than one registration record for each broker after restarting it, so can we rely on the broker epoch? I think I need some time to check the logic before make a final decision. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #12409: KAFKA-14058: Migrate ExactlyOnceWorkerSourceTaskTest from EasyMock and PowerMock to Mockito
clolov commented on PR #12409: URL: https://github.com/apache/kafka/pull/12409#issuecomment-1189292431 Oh, apologies, I didn't get a notification for this. Yep, I will review it shortly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #12422: KAFKA-13982: Move WorkerConfigTransformerTest to use Mockito
clolov commented on PR #12422: URL: https://github.com/apache/kafka/pull/12422#issuecomment-1189290301 @C0urante I believe you might have the required context to review this pull request :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov opened a new pull request, #12422: KAFKA-13982: Move WorkerConfigTransformerTest to use Mockito
clolov opened a new pull request, #12422: URL: https://github.com/apache/kafka/pull/12422 Addressing https://issues.apache.org/jira/browse/KAFKA-13982. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #12421: Revert "KAFKA-12887 Skip some RuntimeExceptions from exception handle…
mjsax merged PR #12421: URL: https://github.com/apache/kafka/pull/12421 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14079) Source task will not commit offsets and develops memory leak if "error.tolerance" is set to "all"
[ https://issues.apache.org/jira/browse/KAFKA-14079?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568626#comment-17568626 ] Randall Hauch commented on KAFKA-14079: --- Following up with some additional detail: This issue can affect users that are upgrading to AK 3.2.0, even if they don't modify any Connect worker config or connector configurations. For example, if a user has a pre-AK 3.2.0 Connect installation running with one or more source connector configurations that use {{{}error.tolerance=all{}}}, then when that Connect installation is upgraded to AK 3.2.0 _and_ subsequently the producer fails to send and ack messages generated by the source connector (e.g., message too large, etc.), then Connect will continue to write records to topics by will no longer commit source offsets for that connector. As mentioned above, Connect will accumulate those additional records in-memory, causing the worker to eventually fail with an OOM. Unfortunately, restarting is not likely to be helpful, either: the source offsets are not changed/committed once this condition happens, so upon restart the connector will resume from the previously-committed source offsets and will likely regenerate the same problematic messages as before, triggering the problem again and causing the same OOM. The only way to recover is to fix the underlying problem reported by the producer (e.g., message too large), and restart the Connect workers. Luckily the problems reported by the producer are captured in the worker logs.Note that changing the connector configuration to use {{error.tolerance=none}} will cause the connector to stop/fail as soon as the producer fails to write a record to the topic (e.g., message too large), and will not generate duplicate messages beyond the first problematic one (like with {{{}error.tolerance=all{}}}). But again, the underlying problem must be corrected before the connector can be restarted successfully. This issue does not affect: * sink connectors; * source connector configurations that use {{{}error.tolerance=none{}}}, which is the default behavior; or * source connectors that never use or rely upon source offsets (a smallish fraction of all source connector types) Most source connectors do rely upon source offsets, though, so this is a fairly serious issue. Thanks, [~cshannon] and [~ChrisEgerton] for the quick work and review of these PRs. Both PRs linked above (one for the `trunk` branch and one for the `3.2` branch) have been merged. The `3.2` PR was merged before the first 3.2.1 RC, and so the AK 3.2.1 release should include this fix. > Source task will not commit offsets and develops memory leak if > "error.tolerance" is set to "all" > - > > Key: KAFKA-14079 > URL: https://issues.apache.org/jira/browse/KAFKA-14079 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.2.0 >Reporter: Christopher L. Shannon >Assignee: Christopher L. Shannon >Priority: Critical > Fix For: 3.3.0, 3.2.1 > > > KAFKA-13348 added the ability to ignore producer exceptions by setting > {{error.tolerance}} to {{{}all{}}}. When this is set to all a null record > metadata is passed to commitRecord() and the task continues. > The issue is that records are tracked by {{SubmittedRecords}} and the first > time an error happens the code does not ack the record with the error and > just skips it so it will not have the offsets committed or be removed from > SubmittedRecords before calling commitRecord(). > This leads to a bug where future offsets won't be committed anymore and also > a memory leak because the algorithm that removes acked records from the > internal map to commit offsets [looks > |https://github.com/apache/kafka/blob/3.2.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SubmittedRecords.java#L177] > at the head of the Deque where the records are tracked in and if it sees the > record is unacked it will not process anymore removals. This leads to all new > records that go through the task to continue to be added and not have offsets > committed and never removed from tracking until an OOM error occurs. > The fix is to make sure to ack the failed records so they can have their > offsets commited and be removed from tracking. This is fine to do as the > records are intended to be skipped and not reprocessed. Metrics also need to > be updated as well. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rhauch commented on pull request #12412: KAFKA-14079 - Ack failed records in WorkerSourceTask when error tolerance is ALL
rhauch commented on PR #12412: URL: https://github.com/apache/kafka/pull/12412#issuecomment-1189234384 As you know, the AK project is actively working on a 3.2.1 release; see the [Dev List thread](https://lists.apache.org/list?d...@kafka.apache.org:2022-7). I've responded to the thread mentioning this issue is no longer a blocker, and should appear in the forthcoming 3.2.1 release. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aiquestion commented on pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance
aiquestion commented on PR #12349: URL: https://github.com/apache/kafka/pull/12349#issuecomment-1189224423 @showuon Thanks a lot for your comment. i think i've fixed all the comment. And yes, i can start a new PR to do it if you think 'current code is okay to merge, and need some refine'. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance
aiquestion commented on code in PR #12349: URL: https://github.com/apache/kafka/pull/12349#discussion_r924677278 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ## @@ -1299,6 +1377,178 @@ public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() { } } +@Test +public void testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry() { +rebalanceConfig = buildRebalanceConfig(Optional.of("group-id")); +ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, +new Metrics(), +assignors, +true, +subscriptions); +client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); +coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE)); +subscriptions.subscribe(singleton(topic1), rebalanceListener); +client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE)); +client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE)); +coordinator.joinGroupIfNeeded(time.timer(Long.MAX_VALUE)); Review Comment: Thanks! the test looks much simpler -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance
aiquestion commented on code in PR #12349: URL: https://github.com/apache/kafka/pull/12349#discussion_r924676827 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ## @@ -1299,6 +1377,178 @@ public void testForceMetadataDeleteForPatternSubscriptionDuringRebalance() { } } +@Test +public void testOnJoinPrepareWithOffsetCommitShouldSuccessAfterRetry() { +rebalanceConfig = buildRebalanceConfig(Optional.of("group-id")); Review Comment: fixed, thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance
aiquestion commented on code in PR #12349: URL: https://github.com/apache/kafka/pull/12349#discussion_r924676541 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ## @@ -1251,6 +1252,83 @@ public void testForceMetadataRefreshForPatternSubscriptionDuringRebalance() { assertFalse(coordinator.rejoinNeededOrPending()); } +@Test +public void testPatternSubscribeAndRejoinGroupAfterTopicDelete() { Review Comment: i've deleted it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance
aiquestion commented on code in PR #12349: URL: https://github.com/apache/kafka/pull/12349#discussion_r924676347 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -969,6 +973,89 @@ class PlaintextConsumerTest extends BaseConsumerTest { } } + @ParameterizedTest + @ValueSource(strings = Array( +"org.apache.kafka.clients.consumer.CooperativeStickyAssignor", +"org.apache.kafka.clients.consumer.RangeAssignor")) + def testRebalanceAndRejoin(assignmentStrategy: String): Unit = { +// create 2 consumers +this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "rebalance-and-rejoin-group") + this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, assignmentStrategy) +this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") +val consumer1 = createConsumer() +val consumer2 = createConsumer() + +// create a new topic, have 2 partitions +val topic = "topic1" +val producer = createProducer() +val expectedAssignment = createTopicAndSendRecords(producer, topic, 2, 100) + +assertEquals(0, consumer1.assignment().size) +assertEquals(0, consumer2.assignment().size) + +val lock = new ReentrantLock() +var generationId1 = -1 +var memberId1 = "" +val customRebalanceListener = new ConsumerRebalanceListener { + override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = { + } + override def onPartitionsAssigned(partitions: util.Collection[TopicPartition]): Unit = { +if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) { + fail(s"Time out while awaiting for lock.") +} +try { + generationId1 = consumer1.groupMetadata().generationId() + memberId1 = consumer1.groupMetadata().memberId() +} finally { + lock.unlock() +} + } +} +val consumerPoller1 = new ConsumerAssignmentPoller(consumer1, List(topic), Set.empty, customRebalanceListener) +consumerPoller1.start() +TestUtils.waitUntilTrue(() => consumerPoller1.consumerAssignment() == expectedAssignment, + s"Timed out while awaiting expected assignment change to $expectedAssignment.") + +// Since the consumer1 already completed the rebalance, +// the `onPartitionsAssigned` rebalance listener will be invoked to set the generationId and memberId +var stableGeneration = -1 +var stableMemberId1 = "" +if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) { + fail(s"Time out while awaiting for lock.") +} +try { + stableGeneration = generationId1 + stableMemberId1 = memberId1 +} finally { + lock.unlock() +} + +val consumerPoller2 = subscribeConsumerAndStartPolling(consumer2, List(topic)) +TestUtils.waitUntilTrue(() => consumerPoller1.consumerAssignment().size == 1, + s"Timed out while awaiting expected assignment change to 1.") +TestUtils.waitUntilTrue(() => consumerPoller2.consumerAssignment().size == 1, + s"Timed out while awaiting expected assignment change to 1.") + +if (!lock.tryLock(3000, TimeUnit.MILLISECONDS)) { + fail(s"Time out while awaiting for lock.") +} +try { + if (assignmentStrategy.equals(classOf[CooperativeStickyAssignor].getName)) { +// cooperative rebalance should rebalance twice before finally stable +assertEquals(stableGeneration + 2, generationId1) + } else { +// eager rebalance should rebalance once once before finally stable Review Comment: fixed, thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance
aiquestion commented on code in PR #12349: URL: https://github.com/apache/kafka/pull/12349#discussion_r924676087 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -740,24 +746,59 @@ private void validateCooperativeAssignment(final Map future = maybeAutoCommitOffsetsAsync(); -// return true when -// 1. future is null, which means no commit request sent, so it is still considered completed -// 2. offset commit completed -// 3. offset commit failed with non-retriable exception -if (future == null) -onJoinPrepareAsyncCommitCompleted = true; -else if (future.succeeded()) -onJoinPrepareAsyncCommitCompleted = true; -else if (future.failed() && !future.isRetriable()) { -log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage()); -onJoinPrepareAsyncCommitCompleted = true; +// and there is no in-flight offset commit request +if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) { +autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync(); } +// wait for commit offset response before timer expired. +if (autoCommitOffsetRequestFuture != null) { +Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ? + timer : joinPrepareTimer; +client.poll(autoCommitOffsetRequestFuture, pollTimer); +timer.update(); +joinPrepareTimer.update(); +} + +// keep retrying the offset commit when: +// 1. offset commit haven't done (and joinPrepareTime not expired) +// 2. failed with retryable exception (and joinPrepareTime not expired) +// Otherwise, continue to revoke partitions, ex: +// 1. if joinPrepareTime has expired +// 2. if offset commit failed with no-retryable exception +// 3. if offset commit success +boolean onJoinPrepareAsyncCommitCompleted = true; +if (autoCommitOffsetRequestFuture != null) { +if (joinPrepareTimer.isExpired()) { +log.error("Asynchronous auto-commit of offsets failed: joinPrepare timeout. Will continue to join group"); +} else if (!autoCommitOffsetRequestFuture.isDone()) { +onJoinPrepareAsyncCommitCompleted = false; +} else if (autoCommitOffsetRequestFuture.failed() && autoCommitOffsetRequestFuture.isRetriable()) { +log.debug("Asynchronous auto-commit of offsets failed with retryable error: {}. Will retry it.", + autoCommitOffsetRequestFuture.exception().getMessage()); +onJoinPrepareAsyncCommitCompleted = false; +} else if (autoCommitOffsetRequestFuture.failed() && !autoCommitOffsetRequestFuture.isRetriable()) { +log.error("Asynchronous auto-commit of offsets failed: {}. Will continue to join group.", + autoCommitOffsetRequestFuture.exception().getMessage()); +} +if (autoCommitOffsetRequestFuture.isDone()) { +autoCommitOffsetRequestFuture = null; +} +} +if (!onJoinPrepareAsyncCommitCompleted) { +timer.sleep(Math.min(timer.remainingMs(), rebalanceConfig.retryBackoffMs)); Review Comment: you are right, i just try to update every timer in my last commit. :-) fixed, thanks ## core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala: ## @@ -969,6 +973,89 @@ class PlaintextConsumerTest extends BaseConsumerTest { } } + @ParameterizedTest + @ValueSource(strings = Array( +"org.apache.kafka.clients.consumer.CooperativeStickyAssignor", +"org.apache.kafka.clients.consumer.RangeAssignor")) + def testRebalanceAndRejoin(assignmentStrategy: String): Unit = { +// create 2 consumers +this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "rebalance-and-rejoin-group") + this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, assignmentStrategy) +this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") +val consumer1 = createConsumer() +val consumer2 = createConsumer() + +// create a new topic, have 2 partitions +val topic = "topic1" +val producer = createProducer() +val expectedAssignment = createTopicAndSendRecords(producer, topic, 2, 100) + +assertEquals(0, consumer1.assignment().size) +assertEquals(0, consumer2.assignment().size) + +val lock = new ReentrantLock() +var generationId1 = -1 +var memberId1 = "" +val customRebalanceListener = new ConsumerRebalanceListener { + override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = { + } + override def onPartitionsAssigned(partitions:
[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance
aiquestion commented on code in PR #12349: URL: https://github.com/apache/kafka/pull/12349#discussion_r924675152 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -740,24 +746,59 @@ private void validateCooperativeAssignment(final Map future = maybeAutoCommitOffsetsAsync(); -// return true when -// 1. future is null, which means no commit request sent, so it is still considered completed -// 2. offset commit completed -// 3. offset commit failed with non-retriable exception -if (future == null) -onJoinPrepareAsyncCommitCompleted = true; -else if (future.succeeded()) -onJoinPrepareAsyncCommitCompleted = true; -else if (future.failed() && !future.isRetriable()) { -log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage()); -onJoinPrepareAsyncCommitCompleted = true; +// and there is no in-flight offset commit request +if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) { +autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync(); } +// wait for commit offset response before timer expired. +if (autoCommitOffsetRequestFuture != null) { +Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ? + timer : joinPrepareTimer; +client.poll(autoCommitOffsetRequestFuture, pollTimer); +timer.update(); Review Comment: fixed, thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance
aiquestion commented on code in PR #12349: URL: https://github.com/apache/kafka/pull/12349#discussion_r924674693 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -191,7 +191,7 @@ public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig, * @param memberId The identifier of this member in the previous group or "" if there was none * @return true If onJoinPrepare async commit succeeded, false otherwise */ -protected abstract boolean onJoinPrepare(int generation, String memberId); +protected abstract boolean onJoinPrepare(Timer timer, int generation, String memberId); Review Comment: fixed, thanks ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -740,24 +746,59 @@ private void validateCooperativeAssignment(final Map future = maybeAutoCommitOffsetsAsync(); -// return true when -// 1. future is null, which means no commit request sent, so it is still considered completed -// 2. offset commit completed -// 3. offset commit failed with non-retriable exception -if (future == null) -onJoinPrepareAsyncCommitCompleted = true; -else if (future.succeeded()) -onJoinPrepareAsyncCommitCompleted = true; -else if (future.failed() && !future.isRetriable()) { -log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage()); -onJoinPrepareAsyncCommitCompleted = true; +// and there is no in-flight offset commit request +if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) { +autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync(); } +// wait for commit offset response before timer expired. +if (autoCommitOffsetRequestFuture != null) { +Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ? + timer : joinPrepareTimer; +client.poll(autoCommitOffsetRequestFuture, pollTimer); +timer.update(); +joinPrepareTimer.update(); +} + +// keep retrying the offset commit when: +// 1. offset commit haven't done (and joinPrepareTime not expired) +// 2. failed with retryable exception (and joinPrepareTime not expired) Review Comment: fixed, thanks ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -740,24 +746,59 @@ private void validateCooperativeAssignment(final Map future = maybeAutoCommitOffsetsAsync(); -// return true when -// 1. future is null, which means no commit request sent, so it is still considered completed -// 2. offset commit completed -// 3. offset commit failed with non-retriable exception -if (future == null) -onJoinPrepareAsyncCommitCompleted = true; -else if (future.succeeded()) -onJoinPrepareAsyncCommitCompleted = true; -else if (future.failed() && !future.isRetriable()) { -log.error("Asynchronous auto-commit of offsets failed: {}", future.exception().getMessage()); -onJoinPrepareAsyncCommitCompleted = true; +// and there is no in-flight offset commit request +if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) { +autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync(); } +// wait for commit offset response before timer expired. +if (autoCommitOffsetRequestFuture != null) { +Timer pollTimer = timer.remainingMs() < joinPrepareTimer.remainingMs() ? + timer : joinPrepareTimer; +client.poll(autoCommitOffsetRequestFuture, pollTimer); +timer.update(); +joinPrepareTimer.update(); +} + +// keep retrying the offset commit when: Review Comment: fixed, thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance
aiquestion commented on code in PR #12349: URL: https://github.com/apache/kafka/pull/12349#discussion_r924670900 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -740,24 +746,59 @@ private void validateCooperativeAssignment(final Map
[GitHub] [kafka] junrao commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer
junrao commented on code in PR #12365: URL: https://github.com/apache/kafka/pull/12365#discussion_r924667928 ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -1491,20 +1495,25 @@ public void setPartition(int partition) { if (log.isTraceEnabled()) { // Log the message here, because we don't know the partition before that. -log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition); +log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, topic, partition); } + +// Reset record to null here so that it doesn't have to be alive as long as the batch is. +record = null; Review Comment: Since the only info we need from record is record.partition(), could we keep record.partition() in the instance instead of the whole record? Since record.partition() is much smaller, maybe there is no need to nullify it in setPartition()? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] MPeli commented on pull request #12331: KAFKA-1194: changes needed to run on Windows
MPeli commented on PR #12331: URL: https://github.com/apache/kafka/pull/12331#issuecomment-1189166900 I also had a problem with Kraft which did not start on Windows. The latest commit fixes https://github.com/apache/kafka/pull/12331/commits/f7cc920771735576d9cfba2afe6f26fdcfb2ccd4 it. The commit https://github.com/apache/kafka/pull/12331/commits/77ae23d816ea1a30a3ec970b7dfe77fd35f7fe00 most likely fixes [KAFKA-7575](https://issues.apache.org/jira/browse/KAFKA-7575) and [KAFKA-2427](https://issues.apache.org/jira/browse/KAFKA-2427), but I will have to do more testing to confirm that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] splett2 commented on a diff in pull request #12416: KAFKA-13559: Fix issue where responses intermittently takes 300+ ms to respond, even when the server is idle.
splett2 commented on code in PR #12416: URL: https://github.com/apache/kafka/pull/12416#discussion_r923930734 ## core/src/test/scala/unit/kafka/network/SocketServerTest.scala: ## @@ -1878,6 +1878,97 @@ class SocketServerTest { }, false) } + /** + * Tests for KAFKA-13559 - Processing request got delayed by 300 ms in the following condition: + * 1. Client-Server communication uses SSL socket. + * 2. More than one requests are read from the socket into SslTransportLayer.netReadBuffer. + * + * This 300 ms delay occurs because the socket has no data but the buffer has data. And the sequence of events that + * leads to this situation is the following (from the server point of view): + * + * Step 1 - SslTransportLayer receives more than one requests in the socket and put it in the buffer + * (SslTransportLayer.netReadBuffer). + * Step 2 - SslTransportLayer reads all of the bytes and stores it in SslTransportLayer.appReadBuffer. + * Step 3 - Process the first request, leaving the second request in SslTransportLayer.appReadBuffer. + * Step 4 - THIS IS WHERE THE DELAY IS. Process the second request. This request is read from + * SslTransportLayer.appReadBuffer, instead of the socket. Because of this, "select(timeout)" in Selector.poll() + * should not block for 300 ms. + * + * This test is implemented following "makeSocketWithBufferedRequests()" method by putting two requests directly + * into SslTransportLayer.netReadBuffer and manually trigger the processing. + * + */ + @Test + def testLatencyWithBufferedDataAndNoSocketData(): Unit = { +shutdownServerAndMetrics(server) + +// to ensure we only have 1 connection (channel) +val props = sslServerProps +val numConnections = 1 +props.put("max.connections.per.ip", numConnections.toString) + +// create server with SSL listener +val testableServer = new TestableSocketServer(KafkaConfig.fromProps(props)) +testableServer.enableRequestProcessing(Map.empty) +// dataPlaneAcceptors.get(endpoint).processors(0).selector.asInstanceOf[TestableSelector] +val testableSelector = testableServer.testableSelector +val proxyServer = new ProxyServer(testableServer) + +try { + // trigger SSL handshake by sending the first request and receiving its response without buffering + val requestBytes = producerRequestBytes() + val sslSocket = sslClientSocket(proxyServer.localPort) + + sendRequest(sslSocket, requestBytes) + val request1 = receiveRequest(testableServer.dataPlaneRequestChannel) + processRequest(testableServer.dataPlaneRequestChannel, request1) + receiveResponse(sslSocket) + + // then put 2 requests in SslTransportLayer.netReadBuffer via the ProxyServer + val connectionId = request1.context.connectionId + val listener = testableServer.config.dataPlaneListeners.head.listenerName.value + val channel = testableServer.dataPlaneAcceptor(listener).get.processors(0).channel(connectionId).getOrElse(throw new IllegalStateException("Channel not found")) + val transportLayer: SslTransportLayer = JTestUtils.fieldValue(channel, classOf[KafkaChannel], "transportLayer") + val netReadBuffer: ByteBuffer = JTestUtils.fieldValue(transportLayer, classOf[SslTransportLayer], "netReadBuffer") + + proxyServer.enableBuffering(netReadBuffer) + sendRequest(sslSocket, requestBytes) + sendRequest(sslSocket, requestBytes) + + val keysWithBufferedRead: util.Set[SelectionKey] = JTestUtils.fieldValue(testableSelector, classOf[Selector], "keysWithBufferedRead") + keysWithBufferedRead.add(channel.selectionKey) + JTestUtils.setFieldValue(transportLayer, "hasBytesBuffered", true) + + // process the first request in the server side + // this would move bytes from netReadBuffer to appReadBuffer, then process only the first request + processRequest(testableServer.dataPlaneRequestChannel) + + // receive response in the client side + receiveResponse(sslSocket) + + // process the second request in the server side + // this would process the second request in the appReadBuffer + // NOTE: this should not block because the data is already in the buffer, but without the fix for KAFKA-13559, + // this step will take more than 300 ms + val processTimeStart = System.currentTimeMillis() + processRequest(testableServer.dataPlaneRequestChannel) + val processTimeEnd = System.currentTimeMillis() + + // receive response in the client side + receiveResponse(sslSocket) Review Comment: I'm trying to think about how we can get stronger sequencing guarantees here. Maybe we can do something like explicitly set the testable selector's pollTimeoutOverride to some large value and explicitly call wakeup once we have the right state setup. ## core/src/test/scala/unit/kafka/network/SocketServerTest.scala:
[GitHub] [kafka] aiquestion commented on a diff in pull request #12349: KAFKA-14024: Consumer keeps Commit offset in onJoinPrepare in Cooperative rebalance
aiquestion commented on code in PR #12349: URL: https://github.com/apache/kafka/pull/12349#discussion_r924600012 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java: ## @@ -1251,6 +1252,83 @@ public void testForceMetadataRefreshForPatternSubscriptionDuringRebalance() { assertFalse(coordinator.rejoinNeededOrPending()); } +@Test +public void testPatternSubscribeAndRejoinGroupAfterTopicDelete() { Review Comment: Actually i wrote it to show the behavior of [KAFKA-13310](https://issues.apache.org/jira/browse/KAFKA-13310) case after this change. It's not a case i want to protect from being changed, i think i can delete it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies
[ https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568598#comment-17568598 ] ASF GitHub Bot commented on KAFKA-13868: tombentley commented on code in PR #420: URL: https://github.com/apache/kafka-site/pull/420#discussion_r924595805 ## css/fonts.css: ## @@ -0,0 +1,82 @@ +/* cutive-mono-regular - latin-ext_latin */ +@font-face { +font-family: 'Cutive Mono'; +font-style: normal; +font-weight: 400; +src: local(''), Review Comment: Ah, thanks! > Website updates to satisfy Apache privacy policies > -- > > Key: KAFKA-13868 > URL: https://issues.apache.org/jira/browse/KAFKA-13868 > Project: Kafka > Issue Type: Bug > Components: website >Reporter: Mickael Maison >Assignee: Divij Vaidya >Priority: Critical > > The ASF has updated its privacy policy and all websites must be compliant. > The full guidelines can be found in > [https://privacy.apache.org/faq/committers.html] > The Kafka website has a few issues, including: > - It's missing a link to the privacy policy: > [https://privacy.apache.org/policies/privacy-policy-public.html] > - It's using Google Analytics > - It's using Google Fonts > - It's using scripts hosted on Cloudflare CDN > - Embedded videos don't have an image placeholder > As per the email sent to the PMC, all updates have to be done by July 22. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies
[ https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568584#comment-17568584 ] ASF GitHub Bot commented on KAFKA-13868: divijvaidya commented on code in PR #420: URL: https://github.com/apache/kafka-site/pull/420#discussion_r924561314 ## css/fonts.css: ## @@ -0,0 +1,82 @@ +/* cutive-mono-regular - latin-ext_latin */ +@font-face { +font-family: 'Cutive Mono'; +font-style: normal; +font-weight: 400; +src: local(''), Review Comment: https://stackoverflow.com/a/22835957 explains the rationale (for smiley but empty string follows same logic). Though this is only necessary for IE6-8 (very old browsers) to handle an edge case (different font with same name locally). I think I would remove it to reduce confusion while reading code. > Website updates to satisfy Apache privacy policies > -- > > Key: KAFKA-13868 > URL: https://issues.apache.org/jira/browse/KAFKA-13868 > Project: Kafka > Issue Type: Bug > Components: website >Reporter: Mickael Maison >Assignee: Divij Vaidya >Priority: Critical > > The ASF has updated its privacy policy and all websites must be compliant. > The full guidelines can be found in > [https://privacy.apache.org/faq/committers.html] > The Kafka website has a few issues, including: > - It's missing a link to the privacy policy: > [https://privacy.apache.org/policies/privacy-policy-public.html] > - It's using Google Analytics > - It's using Google Fonts > - It's using scripts hosted on Cloudflare CDN > - Embedded videos don't have an image placeholder > As per the email sent to the PMC, all updates have to be done by July 22. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies
[ https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568573#comment-17568573 ] ASF GitHub Bot commented on KAFKA-13868: tombentley commented on code in PR #420: URL: https://github.com/apache/kafka-site/pull/420#discussion_r924521925 ## css/fonts.css: ## @@ -0,0 +1,82 @@ +/* cutive-mono-regular - latin-ext_latin */ +@font-face { +font-family: 'Cutive Mono'; +font-style: normal; +font-weight: 400; +src: local(''), Review Comment: Can you explain why the argument to `local` is the empty string? I'm no CSS expert, but from a quick google (https://stackoverflow.com/questions/3837249/font-face-src-local-how-to-use-the-local-font-if-the-user-already-has-it) it seems like it's supposed to be used for some kind of caching, but not with an empty string argument. > Website updates to satisfy Apache privacy policies > -- > > Key: KAFKA-13868 > URL: https://issues.apache.org/jira/browse/KAFKA-13868 > Project: Kafka > Issue Type: Bug > Components: website >Reporter: Mickael Maison >Assignee: Divij Vaidya >Priority: Critical > > The ASF has updated its privacy policy and all websites must be compliant. > The full guidelines can be found in > [https://privacy.apache.org/faq/committers.html] > The Kafka website has a few issues, including: > - It's missing a link to the privacy policy: > [https://privacy.apache.org/policies/privacy-policy-public.html] > - It's using Google Analytics > - It's using Google Fonts > - It's using scripts hosted on Cloudflare CDN > - Embedded videos don't have an image placeholder > As per the email sent to the PMC, all updates have to be done by July 22. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] nicolasguyomar commented on pull request #12378: MINOR : lower Metadata info log to debug for topic ID change
nicolasguyomar commented on PR #12378: URL: https://github.com/apache/kafka/pull/12378#issuecomment-1189066691 Hello @jolshan , this is a Confluent Bundle 7.2.0 console producer output, producing a "test" message in topic "nicolas" You'll see that upon receiving the first Metadata response, we log the message that I would like to "hide", but you're probably right we should not log it at all [2022-07-19 15:35:41,214] INFO [Producer clientId=console-producer] Resetting the last seen epoch of partition nicolas-0 to 0 since the associated topicId changed from null to 7OdYyViuRY2XmggNWuP_Vg (org.apache.kafka.clients.Metadata) ``` ./kafka-console-producer --bootstrap-server=localhost:9092 --topic nicolas [2022-07-19 15:35:30,945] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$) [2022-07-19 15:35:31,012] INFO ProducerConfig values: (org.apache.kafka.clients.producer.ProducerConfig) [2022-07-19 15:35:31,037] INFO [Producer clientId=console-producer] Instantiated an idempotent producer. (org.apache.kafka.clients.producer.KafkaProducer) [2022-07-19 15:35:31,129] INFO Successfully logged in. (org.apache.kafka.common.security.authenticator.AbstractLogin) [2022-07-19 15:35:31,227] WARN The configuration 'sasl.mechanisms' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig) [2022-07-19 15:35:31,227] WARN The configuration 'schema.registry.url' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig) [2022-07-19 15:35:31,227] WARN The configuration 'schema.registry.basic.auth.credentials.source' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig) [2022-07-19 15:35:31,227] WARN The configuration 'sasl.username' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig) [2022-07-19 15:35:31,227] WARN The configuration 'schema.registry.basic.auth.user.info' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig) [2022-07-19 15:35:31,227] WARN The configuration 'sasl.password' was supplied but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig) [2022-07-19 15:35:31,230] INFO Kafka version: 7.2.0-ce (org.apache.kafka.common.utils.AppInfoParser) [2022-07-19 15:35:31,230] INFO Kafka commitId: 510078ca78367bd2542eaca9d26ebd22c5ca95cc (org.apache.kafka.common.utils.AppInfoParser) [2022-07-19 15:35:31,230] INFO Kafka startTimeMs: 1658237731227 (org.apache.kafka.common.utils.AppInfoParser) >[2022-07-19 15:35:32,290] INFO [Producer clientId=console-producer] Cluster ID: lkc-xrrwg (org.apache.kafka.clients.Metadata) [2022-07-19 15:35:32,293] INFO [Producer clientId=console-producer] ProducerId set to 11385814 with epoch 0 (org.apache.kafka.clients.producer.internals.TransactionManager) test [2022-07-19 15:35:41,214] INFO [Producer clientId=console-producer] Resetting the last seen epoch of partition nicolas-0 to 0 since the associated topicId changed from null to 7OdYyViuRY2XmggNWuP_Vg (org.apache.kafka.clients.Metadata) >^C[2022-07-19 15:35:55,532] INFO [Producer clientId=console-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer) [2022-07-19 15:35:55,557] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics) [2022-07-19 15:35:55,557] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics) [2022-07-19 15:35:55,557] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics) [2022-07-19 15:35:55,558] INFO App info kafka.producer for console-producer unregistered (org.apache.kafka.common.utils.AppInfoParser) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies
[ https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568566#comment-17568566 ] ASF GitHub Bot commented on KAFKA-13868: divijvaidya commented on PR #424: URL: https://github.com/apache/kafka-site/pull/424#issuecomment-1189065641 @mimaison please review for the GDPR compliance. > Website updates to satisfy Apache privacy policies > -- > > Key: KAFKA-13868 > URL: https://issues.apache.org/jira/browse/KAFKA-13868 > Project: Kafka > Issue Type: Bug > Components: website >Reporter: Mickael Maison >Assignee: Divij Vaidya >Priority: Critical > > The ASF has updated its privacy policy and all websites must be compliant. > The full guidelines can be found in > [https://privacy.apache.org/faq/committers.html] > The Kafka website has a few issues, including: > - It's missing a link to the privacy policy: > [https://privacy.apache.org/policies/privacy-policy-public.html] > - It's using Google Analytics > - It's using Google Fonts > - It's using scripts hosted on Cloudflare CDN > - Embedded videos don't have an image placeholder > As per the email sent to the PMC, all updates have to be done by July 22. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies
[ https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568565#comment-17568565 ] ASF GitHub Bot commented on KAFKA-13868: divijvaidya opened a new pull request, #424: URL: https://github.com/apache/kafka-site/pull/424 As per the [ASF privacy policy](https://privacy.apache.org/faq/committers.html), Google Analytics should be replaced with Apache hosted version of Matomo to remain complaint with GDPR. Email thread where we received the site Id that is used with Matomo: https://lists.apache.org/thread/0rpo0ffcd70c2yxfnqfqk43oyg7c8x8d ## Code changes - Remove Google Analytics script - Remote `google-site-verification` files which are used by Google to verify the ownership of site. - Add Matomo script to the `` section (all JS scripts should ideally be placed there and not in ``). ## Results After deploying the changes, we should be able to analyse the results at https://analytics.apache.org/index.php?module=MultiSites=index=1=day=yesterday > Website updates to satisfy Apache privacy policies > -- > > Key: KAFKA-13868 > URL: https://issues.apache.org/jira/browse/KAFKA-13868 > Project: Kafka > Issue Type: Bug > Components: website >Reporter: Mickael Maison >Assignee: Divij Vaidya >Priority: Critical > > The ASF has updated its privacy policy and all websites must be compliant. > The full guidelines can be found in > [https://privacy.apache.org/faq/committers.html] > The Kafka website has a few issues, including: > - It's missing a link to the privacy policy: > [https://privacy.apache.org/policies/privacy-policy-public.html] > - It's using Google Analytics > - It's using Google Fonts > - It's using scripts hosted on Cloudflare CDN > - Embedded videos don't have an image placeholder > As per the email sent to the PMC, all updates have to be done by July 22. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies
[ https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568535#comment-17568535 ] ASF GitHub Bot commented on KAFKA-13868: showuon commented on PR #420: URL: https://github.com/apache/kafka-site/pull/420#issuecomment-1189006953 > That is a very fair question to ask. The number of actual font files are lesser because the new font files contain all the charset in one file. I see. Thanks. > I will wait till tomorrow for others to add their thoughts. After that I can change this PR with the approach that you mentioned. Sounds ok Luke? Yes, let's see what other people's thoughts. But I don't think it needs to adopt my approach if the font output is the same. Thank you. > Website updates to satisfy Apache privacy policies > -- > > Key: KAFKA-13868 > URL: https://issues.apache.org/jira/browse/KAFKA-13868 > Project: Kafka > Issue Type: Bug > Components: website >Reporter: Mickael Maison >Assignee: Divij Vaidya >Priority: Critical > > The ASF has updated its privacy policy and all websites must be compliant. > The full guidelines can be found in > [https://privacy.apache.org/faq/committers.html] > The Kafka website has a few issues, including: > - It's missing a link to the privacy policy: > [https://privacy.apache.org/policies/privacy-policy-public.html] > - It's using Google Analytics > - It's using Google Fonts > - It's using scripts hosted on Cloudflare CDN > - Embedded videos don't have an image placeholder > As per the email sent to the PMC, all updates have to be done by July 22. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies
[ https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568515#comment-17568515 ] ASF GitHub Bot commented on KAFKA-13868: mimaison merged PR #421: URL: https://github.com/apache/kafka-site/pull/421 > Website updates to satisfy Apache privacy policies > -- > > Key: KAFKA-13868 > URL: https://issues.apache.org/jira/browse/KAFKA-13868 > Project: Kafka > Issue Type: Bug > Components: website >Reporter: Mickael Maison >Assignee: Divij Vaidya >Priority: Critical > > The ASF has updated its privacy policy and all websites must be compliant. > The full guidelines can be found in > [https://privacy.apache.org/faq/committers.html] > The Kafka website has a few issues, including: > - It's missing a link to the privacy policy: > [https://privacy.apache.org/policies/privacy-policy-public.html] > - It's using Google Analytics > - It's using Google Fonts > - It's using scripts hosted on Cloudflare CDN > - Embedded videos don't have an image placeholder > As per the email sent to the PMC, all updates have to be done by July 22. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] viktorsomogyi commented on pull request #11886: KAFKA-13730: OAuth access token validation fails if it does not conta…
viktorsomogyi commented on PR #11886: URL: https://github.com/apache/kafka/pull/11886#issuecomment-1188969830 @rajinisivaram @omkreddy can you help in getting this reviewed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies
[ https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568483#comment-17568483 ] ASF GitHub Bot commented on KAFKA-13868: divijvaidya commented on PR #420: URL: https://github.com/apache/kafka-site/pull/420#issuecomment-1188923709 > We can download the font via https://fonts.gstatic.com/s/cutivemono/v14/m8JWjfRfY7WVjVi2E-K9H6RMTm6o39ucNvc.woff2 Oh yes. We can do that but I think that would be less maintainable. This is because when a new version of these fonts are available, we won't have a mechanism to update it. I can go ahead with this approach too if that makes it safer to accept this change. I don't have any strong opinions on this. > Does that mean we actually only use 9 of them? How do you know that? That is a very fair question to ask. The number of actual font files are lesser because the new font files contain all the charset in one file. I will wait till tomorrow for others to add their thoughts. After that I can change this PR with the approach that you mentioned. Sounds ok Luke? > Website updates to satisfy Apache privacy policies > -- > > Key: KAFKA-13868 > URL: https://issues.apache.org/jira/browse/KAFKA-13868 > Project: Kafka > Issue Type: Bug > Components: website >Reporter: Mickael Maison >Assignee: Divij Vaidya >Priority: Critical > > The ASF has updated its privacy policy and all websites must be compliant. > The full guidelines can be found in > [https://privacy.apache.org/faq/committers.html] > The Kafka website has a few issues, including: > - It's missing a link to the privacy policy: > [https://privacy.apache.org/policies/privacy-policy-public.html] > - It's using Google Analytics > - It's using Google Fonts > - It's using scripts hosted on Cloudflare CDN > - Embedded videos don't have an image placeholder > As per the email sent to the PMC, all updates have to be done by July 22. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cshannon commented on pull request #12412: KAFKA-14079 - Ack failed records in WorkerSourceTask when error tolerance is ALL
cshannon commented on PR #12412: URL: https://github.com/apache/kafka/pull/12412#issuecomment-1188913781 @rhauch - Great, thanks for merging this so quickly. I've been running a custom build with this fix for a couple days and it has looked good so far so it will be nice to get 3.2.1 out soon. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14074) Restarting a broker during re-assignment can leave log directory entries
[ https://issues.apache.org/jira/browse/KAFKA-14074?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568465#comment-17568465 ] Adrian Preston commented on KAFKA-14074: Thanks for pointing out KAFKA-13972, [~jolshan]. Unfortunately I don't think this is exactly the same problem. I've built the branch corresponding to the pull request in KAFKA-13972 ([https://github.com/apache/kafka/pull/12271)], and can still reproduce the stray topic partition directories problem described in this issue. > Restarting a broker during re-assignment can leave log directory entries > > > Key: KAFKA-14074 > URL: https://issues.apache.org/jira/browse/KAFKA-14074 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.8.0, 3.1.0 >Reporter: Adrian Preston >Priority: Major > > Re-starting a broker while replicas are being assigned away from the broker > can result in topic partition directories being left in the broker’s log > directory. This can trigger further problems if such a topic is deleted and > re-created. These problems occur when replicas for the new topic are placed > on a broker that hosts a “stale” topic partition directory of the same name, > causing the on-disk topic partition state held by different brokers in the > cluster to diverge. > We have also been able to re-produce variants this problem using Kafka 2.8 > and 3.1, as well as Kafka built from the head of the apache/kafka repository > (at the time of writing this is commit: > 94d4fdeb28b3cd4d474d943448a7ef653eaa145d). We have *not* being able to > re-produce this problem with Kafka running in KRaft mode. > A minimal re-create for topic directories being left on disk is as follows: > # Start ZooKeeper and a broker (both using the sample config) > # Create 100 topics: each with 1 partition, and with replication factor 1 > # Add a second broker to the Kafka cluster (with minor edits to the sample > config for: {{{}broker.id{}}}, {{{}listeners{}}}, and {{{}log.dirs{}}}) > # Issue a re-assignment that moves all of the topic partition replicas from > the first broker to the second broker > # While this re-assignment is taking place shutdown the first broker (you > need to be quick with only two brokers and 100 topics…) > # Wait a few seconds for the re-assignment to stall > # Restart the first broker and wait for the re-assignment to complete and it > to remove any partially deleted topics (e.g. those with a “-delete” suffix). > Inspecting the logs directory for the first broker should show directories > corresponding to topic partitions that are owned by the second broker. These > are not cleaned up when the re-assignment completes, and also remain in the > logs directory even if the first broker is restarted. Deleting the topic > also does not clean up the topic partitions left behind on the first broker - > which leads to a second potential problem. > For topics that have more than one replica: a new topic that has the same > name as a previously deleted topic might have replicas created on a broker > with “stale” topic partition directories. If this happens these topics will > remain in an under-replicated state. > A minimal re-create for this is as follows: > # Create a three node Kafka cluster (backed by ZK) based off the sample > config (to avoid confusion let’s call these kafka-0, kafka-1, and kafka-2) > # Create 100 topics: each with 1 partition, and with replication factor 2 > # Submit a re-assignment to move all of the topic partition replicas to > kafka-0 and kafka-1, and wait for it to complete > # Submit a re-assignment to move all of the topic partition replicas on > kafka-0 to kafka-2. > # While this re-assignment is taking place shutdown and re-start kafka-0. > # Wait for the re-assignment to complete, and check that there’s unexpected > topic partition directories in kafka-0’s logs directory > # Delete all 100 topics, and re-create 100 new topics with the same name and > configuration as the deleted topics. > In this state kafka-1 and kafka-2 continually generate log messages similar > to: > {{[2022-07-14 13:07:49,118] WARN [ReplicaFetcher replicaId=2, leaderId=0, > fetcherId=0] Received INCONSISTENT_TOPIC_ID from the leader for partition > test-039-0. This error may be returned transiently when the partition is > being created or deleted, but it is not expected to persist. > (kafka.server.ReplicaFetcherThread)}} > Topics that have had replicas created on kafka-0 are under-replicated with > kafka-0 missing from the ISR list. Performing a rolling restart of each > broker in turn does not resolve the problem, in fact more partitions are > listed as under-replicated, as before kafka-0 is missing from their ISR list. > I also tried to re-create this with Kafka running
[GitHub] [kafka] fvaleri commented on pull request #12401: Minor: replace .kafka with .log in implementation documentation
fvaleri commented on PR #12401: URL: https://github.com/apache/kafka/pull/12401#issuecomment-1188894824 @showuon is it fine? Do you have any more feedback on this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12331: KAFKA-1194: changes needed to run on Windows
divijvaidya commented on PR #12331: URL: https://github.com/apache/kafka/pull/12331#issuecomment-1188878088 @ijuma, would like to take a look at this or perhaps tag a committer who would be most suited to look into this one? (note that we can review this PR from the lens of change the file preallocation logic to use the new Java NIO.2 APIs. The new APIs are better at handling Windows OS which is nice.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #12309: KAFKA-14007: Invoking connect headers.close method on shutdown
vamossagar12 commented on PR #12309: URL: https://github.com/apache/kafka/pull/12309#issuecomment-1188874838 @showuon could you plz review/merge this? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #12321: KAFKA-14012: Adding null checks for cases when closeQuietly was being passed a lambda object
vamossagar12 commented on PR #12321: URL: https://github.com/apache/kafka/pull/12321#issuecomment-1188874392 @showuon could you plz review/merge this? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #12331: KAFKA-1194: changes needed to run on Windows
mimaison commented on PR #12331: URL: https://github.com/apache/kafka/pull/12331#issuecomment-1188867448 I'm afraid I have no experience running Kafka on Windows so I'm not familiar with the issues on that platform. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #12387: KAFKA-10199: Add RESUME in state updater
cadonna commented on code in PR #12387: URL: https://github.com/apache/kafka/pull/12387#discussion_r924277101 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java: ## @@ -723,6 +723,106 @@ private void shouldNotPauseTaskInRemovedTasks(final Task task) throws Exception verifyPausedTasks(); } +@Test +public void shouldResumeActiveStatefulTask() throws Exception { +final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); +shouldResumeStatefulTask(task); +verify(changelogReader, times(2)).enforceRestoreActive(); +} + +@Test +public void shouldResumeStandbyTask() throws Exception { +final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); +shouldResumeStatefulTask(task); +verify(changelogReader, times(2)).transitToUpdateStandby(); +} + +private void shouldResumeStatefulTask(final Task task) throws Exception { +stateUpdater.start(); +stateUpdater.add(task); + +stateUpdater.pause(task.id()); + +verifyPausedTasks(task); +verifyUpdatingTasks(); + +stateUpdater.resume(task.id()); + +verifyPausedTasks(); +verifyUpdatingTasks(task); +} + +@Test +public void shouldIgnoreResumingNotPausedTasks() throws Exception { Review Comment: I would rather call this `shouldNotResumeNotExistingTasks()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies
[ https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568439#comment-17568439 ] ASF GitHub Bot commented on KAFKA-13868: showuon commented on PR #420: URL: https://github.com/apache/kafka-site/pull/420#issuecomment-1188822349 @divijvaidya , I see. But in https://fonts.googleapis.com/css?family=Cutive+Mono%7CRoboto:100,300,400,700,900%7CRoboto+Condensed:300 , we can download all the fonts in `woff2` format via the link inside the font css, ex: ``` /* latin-ext */ @font-face { font-family: 'Cutive Mono'; font-style: normal; font-weight: 400; src: url(https://fonts.gstatic.com/s/cutivemono/v14/m8JWjfRfY7WVjVi2E-K9H6RMTm6o39ucNvc.woff2) format('woff2'); unicode-range: U+0100-024F, U+0259, U+1E00-1EFF, U+2020, U+20A0-20AB, U+20AD-20CF, U+2113, U+2C60-2C7F, U+A720-A7FF; } ``` We can download the font via https://fonts.gstatic.com/s/cutivemono/v14/m8JWjfRfY7WVjVi2E-K9H6RMTm6o39ucNvc.woff2 . However, I don't insist that we should download the font from google. I just want to make sure we don't miss any fonts we are using in Kafka website now. So, my next question is, how do we know we only need these 9 fonts you added? I saw there are 44 fonts css in this link: https://fonts.googleapis.com/css?family=Cutive+Mono%7CRoboto:100,300,400,700,900%7CRoboto+Condensed:300 . Does that mean we actually only use 9 of them? How do you know that? Thank you again for helping working on this. > Website updates to satisfy Apache privacy policies > -- > > Key: KAFKA-13868 > URL: https://issues.apache.org/jira/browse/KAFKA-13868 > Project: Kafka > Issue Type: Bug > Components: website >Reporter: Mickael Maison >Assignee: Divij Vaidya >Priority: Critical > > The ASF has updated its privacy policy and all websites must be compliant. > The full guidelines can be found in > [https://privacy.apache.org/faq/committers.html] > The Kafka website has a few issues, including: > - It's missing a link to the privacy policy: > [https://privacy.apache.org/policies/privacy-policy-public.html] > - It's using Google Analytics > - It's using Google Fonts > - It's using scripts hosted on Cloudflare CDN > - Embedded videos don't have an image placeholder > As per the email sent to the PMC, all updates have to be done by July 22. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cadonna commented on a diff in pull request #12387: KAFKA-10199: Add RESUME in state updater
cadonna commented on code in PR #12387: URL: https://github.com/apache/kafka/pull/12387#discussion_r924276012 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java: ## @@ -428,6 +456,113 @@ private void shouldRemoveStatefulTask(final Task task) throws Exception { verify(changelogReader).unregister(task.changelogPartitions()); } +@Test +public void shouldPauseActiveStatefulTask() throws Exception { +final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); +shouldPauseStatefulTask(task); +verify(changelogReader, never()).transitToUpdateStandby(); +} + +@Test +public void shouldPauseStandbyTask() throws Exception { +final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); +shouldPauseStatefulTask(task); +verify(changelogReader, times(1)).transitToUpdateStandby(); +} + +@Test +public void shouldPauseActiveTaskAndTransitToUpdateStandby() throws Exception { +final StreamTask task1 = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); +final StandbyTask task2 = createStandbyTaskInStateRunning(TASK_0_1, Collections.singletonList(TOPIC_PARTITION_B_0)); + +stateUpdater.start(); +stateUpdater.add(task1); +stateUpdater.add(task2); + +stateUpdater.pause(task1.id()); + +verifyPausedTasks(task1); +verifyCheckpointTasks(true, task1); +verifyRestoredActiveTasks(); +verifyRemovedTasks(); +verifyUpdatingTasks(task2); +verifyExceptionsAndFailedTasks(); +verify(changelogReader, times(1)).enforceRestoreActive(); +verify(changelogReader, times(1)).transitToUpdateStandby(); +} + +private void shouldPauseStatefulTask(final Task task) throws Exception { +stateUpdater.start(); +stateUpdater.add(task); + +stateUpdater.pause(task.id()); + +verifyPausedTasks(task); +verifyCheckpointTasks(true, task); +verifyRestoredActiveTasks(); +verifyRemovedTasks(); +verifyUpdatingTasks(); +verifyExceptionsAndFailedTasks(); +} + +@Test +public void shouldResumeActiveStatefulTask() throws Exception { +final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); +shouldResumeStatefulTask(task); +verify(changelogReader, times(2)).enforceRestoreActive(); +} + +@Test +public void shouldResumeStandbyTask() throws Exception { +final StandbyTask task = createStandbyTaskInStateRunning(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); +shouldResumeStatefulTask(task); +verify(changelogReader, times(2)).transitToUpdateStandby(); +} + +private void shouldResumeStatefulTask(final Task task) throws Exception { +stateUpdater.start(); +stateUpdater.add(task); + +stateUpdater.pause(task.id()); + +verifyPausedTasks(task); +verifyUpdatingTasks(); + +stateUpdater.resume(task.id()); + +verifyPausedTasks(); +verifyUpdatingTasks(task); +} + +@Test +public void shouldRemovePausedTask() throws Exception { +final StreamTask task = createActiveStatefulTaskInStateRestoring(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)); +stateUpdater.start(); +stateUpdater.add(task); + +stateUpdater.pause(task.id()); + +verifyPausedTasks(task); +verifyCheckpointTasks(true, task); +verifyRestoredActiveTasks(); +verifyRemovedTasks(); +verifyUpdatingTasks(); + +stateUpdater.remove(task.id()); + +verifyPausedTasks(); +verifyRemovedTasks(task); +verifyRestoredActiveTasks(); +verifyUpdatingTasks(); + +stateUpdater.resume(task.id()); + +verifyPausedTasks(); +verifyRemovedTasks(task); +verifyRestoredActiveTasks(); +verifyUpdatingTasks(); +} Review Comment: I was rather thinking about verifying that removed tasks, failed tasks, and restored tasks cannot be resumed. That means without pausing them beforehand. You just ensure that a task failed and then you try to resume it. The same applies to restored and removed tasks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12331: KAFKA-1194: changes needed to run on Windows
divijvaidya commented on PR #12331: URL: https://github.com/apache/kafka/pull/12331#issuecomment-1188795126 @mimaison perhaps you might be interested in looking at this one? I have done an initial review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer
ijuma commented on code in PR #12365: URL: https://github.com/apache/kafka/pull/12365#discussion_r924206144 ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -1491,20 +1495,25 @@ public void setPartition(int partition) { if (log.isTraceEnabled()) { // Log the message here, because we don't know the partition before that. -log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition); +log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, topic, partition); } + +// Reset record to null here so that it doesn't have to be alive as long as the batch is. +record = null; Review Comment: I find that the code complexity to achieve this trace logging is a bit high. I have some ideas on how to improve it, but we can leave that for latter. A simple suggestion for now would be to change `setPartition` to `onPartitionAssigned` or something like that. This would indicate a general callback that can do anything once the partition is known. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies
[ https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568411#comment-17568411 ] ASF GitHub Bot commented on KAFKA-13868: divijvaidya commented on PR #420: URL: https://github.com/apache/kafka-site/pull/420#issuecomment-1188739303 > @divijvaidya , why do you use https://google-webfonts-helper.herokuapp.com/fonts/ to download the fonts, instead of the original link: [https://fonts.googleapis.com/css?family=Cutive+Mono|Roboto:100,300,400,700,900|Roboto+Condensed:300](https://fonts.googleapis.com/css?family=Cutive+Mono%7CRoboto:100,300,400,700,900%7CRoboto+Condensed:300) ? Will that make any difference? Hey @showuon If you visit the link https://fonts.googleapis.com/css?family=Cutive+Mono%7CRoboto:100,300,400,700,900%7CRoboto+Condensed:300, the `url` still points to Google's CDN and hence, does not fulfil our objective of self-hosting the fonts. An alternative could be to download the fonts from the source https://fonts.google.com/ but they only allow to download font family in `ttf` format. In that case, we would need to use a third party tool to convert this `ttl` to `woff2` compression. Further this site, https://fonts.google.com/, does not allow (at least I couldn't find it) to download for different charset such as vietnamese etc. Note that helper tool I used https://google-webfonts-helper.herokuapp.com/fonts/ is using a MIT license and has [10K stars on Github](https://github.com/majodev/google-webfonts-helper), so I decided to trust it. > Website updates to satisfy Apache privacy policies > -- > > Key: KAFKA-13868 > URL: https://issues.apache.org/jira/browse/KAFKA-13868 > Project: Kafka > Issue Type: Bug > Components: website >Reporter: Mickael Maison >Assignee: Divij Vaidya >Priority: Critical > > The ASF has updated its privacy policy and all websites must be compliant. > The full guidelines can be found in > [https://privacy.apache.org/faq/committers.html] > The Kafka website has a few issues, including: > - It's missing a link to the privacy policy: > [https://privacy.apache.org/policies/privacy-policy-public.html] > - It's using Google Analytics > - It's using Google Fonts > - It's using scripts hosted on Cloudflare CDN > - Embedded videos don't have an image placeholder > As per the email sent to the PMC, all updates have to be done by July 22. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ijuma commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer
ijuma commented on code in PR #12365: URL: https://github.com/apache/kafka/pull/12365#discussion_r924191711 ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -1465,13 +1465,17 @@ public boolean isDone() { private class AppendCallbacks implements RecordAccumulator.AppendCallbacks { private final Callback userCallback; private final ProducerInterceptors interceptors; -private final ProducerRecord record; +private ProducerRecord record; +private final String topic; protected int partition = RecordMetadata.UNKNOWN_PARTITION; private AppendCallbacks(Callback userCallback, ProducerInterceptors interceptors, ProducerRecord record) { this.userCallback = userCallback; this.interceptors = interceptors; this.record = record; +// Note a record would be null only if the client application has a bug, but we don't want to +// have NPE here, because the interceptors would not be notified (see .doSend). +topic = record != null ? record.topic() : null; Review Comment: I looked at the test and it seems to check that an exception is thrown? As @junrao said, this can be done by validating what `send` receives instead of polluting the whole codebase. I'm OK if we file a JIRA for that and do it as a separate PR. But we should remove this code when we do that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer
ijuma commented on code in PR #12365: URL: https://github.com/apache/kafka/pull/12365#discussion_r924191711 ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -1465,13 +1465,17 @@ public boolean isDone() { private class AppendCallbacks implements RecordAccumulator.AppendCallbacks { private final Callback userCallback; private final ProducerInterceptors interceptors; -private final ProducerRecord record; +private ProducerRecord record; +private final String topic; protected int partition = RecordMetadata.UNKNOWN_PARTITION; private AppendCallbacks(Callback userCallback, ProducerInterceptors interceptors, ProducerRecord record) { this.userCallback = userCallback; this.interceptors = interceptors; this.record = record; +// Note a record would be null only if the client application has a bug, but we don't want to +// have NPE here, because the interceptors would not be notified (see .doSend). +topic = record != null ? record.topic() : null; Review Comment: I looked at the test and it seems to check that an exception is thrown? As @junrao said, this can be done by validating what `send` receives instead of polluting the whole codebase. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #12365: KAFKA-14020: Performance regression in Producer
ijuma commented on code in PR #12365: URL: https://github.com/apache/kafka/pull/12365#discussion_r924189300 ## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ## @@ -1491,20 +1495,25 @@ public void setPartition(int partition) { if (log.isTraceEnabled()) { Review Comment: For the line in this method, you could do something like: ```java if (partition < 0) throw new IllegalArgumentException("partition should be positive, but it was " + partition): ``` Which is more informative and idiomatic and checks the more general case that we expect partitions to be positive. But I see that we have sprinkled the same check in other methods. So, having a `assertPartitionIsPositive` would probably be a better approach. In any case, since this code was introduced in a different change, we can file a JIRA and do it as a separate PR. I am happy to discuss more, but we should be clear about terminology. Language level asserts in Java aren't used much. Checking preconditions through API boundaries is useful. Within a given boundary, it's best to use the type system to avoid having noise all over the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13868) Website updates to satisfy Apache privacy policies
[ https://issues.apache.org/jira/browse/KAFKA-13868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17568395#comment-17568395 ] ASF GitHub Bot commented on KAFKA-13868: showuon commented on PR #420: URL: https://github.com/apache/kafka-site/pull/420#issuecomment-1188708449 @scott-confluent , could you help review this PR since you added the google font into this project? Thanks. > Website updates to satisfy Apache privacy policies > -- > > Key: KAFKA-13868 > URL: https://issues.apache.org/jira/browse/KAFKA-13868 > Project: Kafka > Issue Type: Bug > Components: website >Reporter: Mickael Maison >Assignee: Divij Vaidya >Priority: Critical > > The ASF has updated its privacy policy and all websites must be compliant. > The full guidelines can be found in > [https://privacy.apache.org/faq/committers.html] > The Kafka website has a few issues, including: > - It's missing a link to the privacy policy: > [https://privacy.apache.org/policies/privacy-policy-public.html] > - It's using Google Analytics > - It's using Google Fonts > - It's using scripts hosted on Cloudflare CDN > - Embedded videos don't have an image placeholder > As per the email sent to the PMC, all updates have to be done by July 22. -- This message was sent by Atlassian Jira (v8.20.10#820010)