[GitHub] [kafka] mjsax commented on pull request #7414: MINOR: Fixed a division by 0 scenario
mjsax commented on PR #7414: URL: https://github.com/apache/kafka/pull/7414#issuecomment-1416593923 Addressed via https://github.com/apache/kafka/pull/13175 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #13143: KAFKA-14491: [3/N] Add logical key value segments
mjsax merged PR #13143: URL: https://github.com/apache/kafka/pull/13143 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13021: KAFKA-14468: Implement CommitRequestManager to manage the commit and autocommit requests
guozhangwang commented on code in PR #13021: URL: https://github.com/apache/kafka/pull/13021#discussion_r1096346374 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/GroupStateManager.java: ## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.GroupRebalanceConfig; +import org.apache.kafka.common.requests.JoinGroupRequest; +import org.apache.kafka.common.requests.OffsetCommitRequest; + +import java.util.Objects; +import java.util.Optional; + +public class GroupStateManager { Review Comment: Maybe just call it `GroupState` since it does not do any management, but more of a grouping of multiple primitive states? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/DefaultBackgroundThread.java: ## @@ -73,7 +76,9 @@ public class DefaultBackgroundThread extends KafkaThread { final ApplicationEventProcessor processor, final ConsumerMetadata metadata, final NetworkClientDelegate networkClient, -final CoordinatorRequestManager coordinatorManager) { +final GroupStateManager groupState, +final CoordinatorRequestManager coordinatorManager, +CommitRequestManager commitRequestManager) { Review Comment: Why this is not final? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals; + +import org.apache.kafka.clients.ClientResponse; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.RetriableCommitFailedException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetCommitRequestData; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.slf4j.Logger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +public class CommitRequestManager implements RequestManager { +private final Queue stagedCommits; +// TODO: We will need to refactor the subscriptionState +private final SubscriptionState subscriptionState; +private final Logger log; +private final Optional autoCommitState; +private final CoordinatorRequestManager coordinatorRequestManager; +private final GroupStateManager groupState; Review Comment: I'd suggest we either change the class name to just `GroupState` or name this field `groupStateManager`? Personally I prefer the former. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java: ## @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache
[GitHub] [kafka] rondagostino opened a new pull request, #13198: MINOR: Rename IBP_3_4_IV1 as added for KIP-405 to IBP_3_5_IV0
rondagostino opened a new pull request, #13198: URL: https://github.com/apache/kafka/pull/13198 https://github.com/apache/kafka/pull/11390/ will be released as part of AK 3.5 but it added the MetadataVersion `IBP_3_4_IV1`. It should have been `IBP_3_5_IV0`. This change causes no incompatibility because this feature is not yet released. Also, `didMetadataChange` is adjusted to `false` because the KRaft metadata log records are unaffected. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14139) Replaced disk can lead to loss of committed data even with non-empty ISR
[ https://issues.apache.org/jira/browse/KAFKA-14139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17684045#comment-17684045 ] Calvin Liu commented on KAFKA-14139: Hi Alex, just posted the KIP. Let me know if you have any comments. https://cwiki.apache.org/confluence/display/KAFKA/KIP-903%3A+Replicas+with+stale+broker+epoch+should+not+be+allowed+to+join+the+ISR > Replaced disk can lead to loss of committed data even with non-empty ISR > > > Key: KAFKA-14139 > URL: https://issues.apache.org/jira/browse/KAFKA-14139 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Calvin Liu >Priority: Major > Fix For: 3.5.0 > > > We have been thinking about disk failure cases recently. Suppose that a disk > has failed and the user needs to restart the disk from an empty state. The > concern is whether this can lead to the unnecessary loss of committed data. > For normal topic partitions, removal from the ISR during controlled shutdown > buys us some protection. After the replica is restarted, it must prove its > state to the leader before it can be added back to the ISR. And it cannot > become a leader until it does so. > An obvious exception to this is when the replica is the last member in the > ISR. In this case, the disk failure itself has compromised the committed > data, so some amount of loss must be expected. > We have been considering other scenarios in which the loss of one disk can > lead to data loss even when there are replicas remaining which have all of > the committed entries. One such scenario is this: > Suppose we have a partition with two replicas: A and B. Initially A is the > leader and it is the only member of the ISR. > # Broker B catches up to A, so A attempts to send an AlterPartition request > to the controller to add B into the ISR. > # Before the AlterPartition request is received, replica B has a hard > failure. > # The current controller successfully fences broker B. It takes no action on > this partition since B is already out of the ISR. > # Before the controller receives the AlterPartition request to add B, it > also fails. > # While the new controller is initializing, suppose that replica B finishes > startup, but the disk has been replaced (all of the previous state has been > lost). > # The new controller sees the registration from broker B first. > # Finally, the AlterPartition from A arrives which adds B back into the ISR > even though it has an empty log. > (Credit for coming up with this scenario goes to [~junrao] .) > I tested this in KRaft and confirmed that this sequence is possible (even if > perhaps unlikely). There are a few ways we could have potentially detected > the issue. First, perhaps the leader should have bumped the leader epoch on > all partitions when B was fenced. Then the inflight AlterPartition would be > doomed no matter when it arrived. > Alternatively, we could have relied on the broker epoch to distinguish the > dead broker's state from that of the restarted broker. This could be done by > including the broker epoch in both the `Fetch` request and in > `AlterPartition`. > Finally, perhaps even normal kafka replication should be using a unique > identifier for each disk so that we can reliably detect when it has changed. > For example, something like what was proposed for the metadata quorum here: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes]. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mumrah opened a new pull request, #13197: Minor: Decode the envelope requests for the request log
mumrah opened a new pull request, #13197: URL: https://github.com/apache/kafka/pull/13197 When looking at the request log for anything forwarded to the controller, all we see is ENVELOPE and the base64 encoded bytes of the request body. This patch decodes the envelope request so it can be fully logged in the request log. Here is a sample output before > [2023-02-03 16:29:55,747] DEBUG Completed request:{"isForwarded":false,"requestHeader":{"requestApiKey":58,"requestApiVersion":0,"correlationId":6,"clientId":"1","requestApiKeyName":"ENVELOPE"},"request":{"requestData":"ACwAAQMADWFkbWluY2xpZW50LTIAAgIFdGVzdAMOc2VnbWVudC5ieXRlcwAHMjA0ODAwAAtzZWdtZW50Lm1zAQAA","requestPrincipal":"AAAFVXNlcgpBTk9OWU1PVVMAAA==","clientHostAddress":"fwAAAQ=="},"response":{"responseData":null,"errorCode":41},"connection":"127.0.0.1:51244-127.0.0.1:51264-1","totalTimeMs":0.362,"requestQueueTimeMs":0.096,"localTimeMs":0.12,"remoteTimeMs":0.0,"throttleTimeMs":0,"responseQueueTimeMs":0.051,"sendTimeMs":0.095,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"EXTERNAL","clientInformation":{"softwareName":"apache-kafka-java","softwareVersion":"unknown"}} (kafka.request.logger:275) and after: > [2023-02-03 16:25:17,606] DEBUG Completed request:{"isForwarded":false,"requestHeader":{"requestApiKey":58,"requestApiVersion":0,"correlationId":0,"clientId":"0","requestApiKeyName":"ENVELOPE"},"request":{"envelopeRequestHeader":{"requestApiKey":44,"requestApiVersion":1,"correlationId":3,"clientId":"adminclient-2","requestApiKeyName":"INCREMENTAL_ALTER_CONFIGS"},"envelopeRequest":{"resources":[{"resourceType":2,"resourceName":"test","configs":[{"name":"segment.bytes","configOperation":0,"value":"204800"},{"name":"segment.ms","configOperation":1,"value":null}]}],"validateOnly":false}},"response":{"responseData":null,"errorCode":41},"connection":"127.0.0.1:50795-127.0.0.1:50813-0","totalTimeMs":4.067,"requestQueueTimeMs":1.86,"localTimeMs":1.927,"remoteTimeMs":0.0,"throttleTimeMs":0,"responseQueueTimeMs":0.096,"sendTimeMs":0.183,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"EXTERNAL","clientInformation":{"softwareName":"apache-kafka-java","softwareVersion ":"unknown"}} (kafka.request.logger:275) This change slightly increases the logging overhead since it makes a copy of the request buffer and runs through the parsing logic. However, this seems like a reasonable tradeoff for better visibility. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #13195: Minor: Add JmxTool note to 3.5.0 notable changes
dajac commented on code in PR #13195: URL: https://github.com/apache/kafka/pull/13195#discussion_r1096238301 ## docs/upgrade.html: ## @@ -19,6 +19,16 @@
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13143: KAFKA-14491: [3/N] Add logical key value segments
vcrfxia commented on code in PR #13143: URL: https://github.com/apache/kafka/pull/13143#discussion_r1096168089 ## streams/src/test/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegmentTest.java: ## @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.InvalidStateStoreException; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder; +import org.apache.kafka.test.InternalMockProcessorContext; +import org.apache.kafka.test.StreamsTestUtils; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class LogicalKeyValueSegmentTest { + +private static final String STORE_NAME = "physical-rocks"; +private static final String METRICS_SCOPE = "metrics-scope"; +private static final String DB_FILE_DIR = "rocksdb"; +private static final Serializer STRING_SERIALIZER = new StringSerializer(); +private static final Deserializer STRING_DESERIALIZER = new StringDeserializer(); + +private RocksDBStore physicalStore; + +private LogicalKeyValueSegment segment1; +private LogicalKeyValueSegment segment2; + +@Before +public void setUp() { +physicalStore = new RocksDBStore(STORE_NAME, DB_FILE_DIR, new RocksDBMetricsRecorder(METRICS_SCOPE, STORE_NAME), false); +physicalStore.init((StateStoreContext) new InternalMockProcessorContext<>( +TestUtils.tempDirectory(), +Serdes.String(), +Serdes.String(), +new StreamsConfig(StreamsTestUtils.getStreamsConfig()) +), physicalStore); + +segment1 = new LogicalKeyValueSegment(1, "segment-1", physicalStore); +segment2 = new LogicalKeyValueSegment(2, "segment-2", physicalStore); +} + +@After +public void tearDown() { +segment1.close(); +segment2.close(); +physicalStore.close(); +} + +@Test +public void shouldPut() { +final KeyValue kv0 = new KeyValue<>("1", "a"); +final KeyValue kv1 = new KeyValue<>("2", "b"); + +segment1.put(new Bytes(kv0.key.getBytes(UTF_8)), kv0.value.getBytes(UTF_8)); +segment1.put(new Bytes(kv1.key.getBytes(UTF_8)), kv1.value.getBytes(UTF_8)); +segment2.put(new Bytes(kv0.key.getBytes(UTF_8)), kv0.value.getBytes(UTF_8)); +segment2.put(new Bytes(kv1.key.getBytes(UTF_8)), kv1.value.getBytes(UTF_8)); + +assertEquals("a", getAndDeserialize(segment1, "1")); Review Comment: Ah good point. That's definitely a gap in `shouldPut()` and `shouldPutAll()`. All of the other tests are already set up in a way that they fail if segments are not properly isolated from each other. Just pushed a fix to the two tests which didn't ensure that, and some minor cleanup to a few of the other tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.
Hangleton commented on code in PR #13040: URL: https://github.com/apache/kafka/pull/13040#discussion_r1095665461 ## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateManager.java: ## @@ -0,0 +1,671 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.protocol.types.Type; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.Crc32C; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.text.NumberFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Maintains a mapping from ProducerIds to metadata about the last appended entries (e.g. + * epoch, sequence number, last offset, etc.) + * + * The sequence number is the last number successfully appended to the partition for the given identifier. + * The epoch is used for fencing against zombie writers. The offset is the one of the last successful message + * appended to the partition. + * + * As long as a producer id is contained in the map, the corresponding producer can continue to write data. + * However, producer ids can be expired due to lack of recent use or if the last written entry has been deleted from + * the log (e.g. if the retention policy is "delete"). For compacted topics, the log cleaner will ensure + * that the most recent entry from a given producer id is retained in the log provided it hasn't expired due to + * age. This ensures that producer ids will not be expired until either the max expiration time has been reached, + * or if the topic also is configured for deletion, the segment containing the last written offset has + * been deleted. + */ +public class ProducerStateManager { +private static final Logger log = LoggerFactory.getLogger(ProducerStateManager.class.getName()); + +// Remove these once UnifiedLog moves to storage module. +public static final String DELETED_FILE_SUFFIX = ".deleted"; +public static final String PRODUCER_SNAPSHOT_FILE_SUFFIX = ".snapshot"; + +public static final long LATE_TRANSACTION_BUFFER_MS = 5 * 60 * 1000; + +private static final short PRODUCER_SNAPSHOT_VERSION = 1; +private static final String VERSION_FIELD = "version"; +private static final String CRC_FIELD = "crc"; +private static final String PRODUCER_ID_FIELD = "producer_id"; +private static final String LAST_SEQUENCE_FIELD = "last_sequence"; +private static final String PRODUCER_EPOCH_FIELD = "epoch"; +private static final String LAST_OFFSET_FIELD = "last_offset"; +private static final String OFFSET_DELTA_FIELD = "offset_delta"; +private static final String TIMESTAMP_FIELD = "timestamp"; +private static final String PRODUCER_ENTRIES_FIELD = "producer_entries"; +private static final String COORDINATOR_EPOCH_FIELD = "coordinator_epoch"; +private static final String CURRENT_TXN_FIRST_OFFSET_FIELD = "current_txn_first_offset"; + +private static final int VERSION_OFFSET = 0; +private static final int CRC_OFFSET = VERSION_OFFSET + 2; +private static final int PR
[GitHub] [kafka] hachikuji commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API
hachikuji commented on code in PR #12972: URL: https://github.com/apache/kafka/pull/12972#discussion_r1096118406 ## clients/src/main/java/org/apache/kafka/common/protocol/Errors.java: ## @@ -372,7 +375,10 @@ public enum Errors { FETCH_SESSION_TOPIC_ID_ERROR(106, "The fetch session encountered inconsistent topic ID usage", FetchSessionTopicIdException::new), INELIGIBLE_REPLICA(107, "The new ISR contains at least one ineligible replica.", IneligibleReplicaException::new), NEW_LEADER_ELECTED(108, "The AlterPartition request successfully updated the partition state but the leader has changed.", NewLeaderElectedException::new), -OFFSET_MOVED_TO_TIERED_STORAGE(109, "The requested offset is moved to tiered storage.", OffsetMovedToTieredStorageException::new); +OFFSET_MOVED_TO_TIERED_STORAGE(109, "The requested offset is moved to tiered storage.", OffsetMovedToTieredStorageException::new), +FENCED_MEMBER_EPOCH(110, "The member epoch is fenced by the group coordinator. The member must abandon all its partitions and rejoins.", FencedMemberEpochException::new), Review Comment: nit: rejoin (singular)? ## clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json: ## @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 68, + "type": "response", + "name": "ConsumerGroupHeartbeatResponse", + "validVersions": "0", + "flexibleVersions": "0+", + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED + // - NOT_COORDINATOR + // - COORDINATOR_NOT_AVAILABLE + // - COORDINATOR_LOAD_IN_PROGRESS + // - INVALID_REQUEST + // - UNKNOWN_MEMBER_ID + // - FENCED_MEMBER_EPOCH + // - UNSUPPORTED_ASSIGNOR + // - UNRELEASED_INSTANCE_ID + // - GROUP_MAX_SIZE_REACHED + "fields": [ +{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, +{ "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The top-level error code, or 0 if there was no error" }, +{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null", Review Comment: I think we have an 'errorMessage' entity type. ## clients/src/main/resources/common/message/ConsumerGroupHeartbeatRequest.json: ## @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 68, + "type": "request", + "listeners": ["zkBroker", "broker"], + "name": "ConsumerGroupHeartbeatRequest", + // The ConsumerGroupHeartbeat API is added as part of KIP-848 and is still + // under developement. Hence, the API is not exposed by default by brokers + // unless explicitely enabled. + "latestVersionUnstable": true, + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ +{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", + "about": "The group identifier." }, +{ "name": "MemberId", "type": "string", "versions": "0+", + "about": "The member id generated by the coordinator. The member id must be kept during the entire lifetime of the member." }, +{ "name": "MemberEpoch", "type": "int32", "versions": "0+", + "about": "The current member epoch; 0 to join the group; -1 to leave the group; -2 to indicate that the static member will
[GitHub] [kafka] guozhangwang commented on a diff in pull request #13190: KAFKA-12639: exit upon expired timer to prevent tight looping
guozhangwang commented on code in PR #13190: URL: https://github.com/apache/kafka/pull/13190#discussion_r1096112947 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -501,13 +501,16 @@ boolean joinGroupIfNeeded(final Timer timer) { } if (exception instanceof UnknownMemberIdException || -exception instanceof IllegalGenerationException || -exception instanceof RebalanceInProgressException || -exception instanceof MemberIdRequiredException) +exception instanceof IllegalGenerationException || +exception instanceof RebalanceInProgressException || +exception instanceof MemberIdRequiredException) continue; else if (!future.isRetriable()) throw exception; +if (timer.isExpired()) { Review Comment: Could you add a couple comments here explaining why we check the timer again here in addition to in line 452 above? Maybe something like this: ``` We check the timer again after calling poll with the timer since it's possible that even after the timer has elapsed, the next client.poll(timer) would immediately return an error response which would cause us to not exiting the while loop. ``` ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java: ## @@ -501,13 +501,16 @@ boolean joinGroupIfNeeded(final Timer timer) { } if (exception instanceof UnknownMemberIdException || -exception instanceof IllegalGenerationException || -exception instanceof RebalanceInProgressException || -exception instanceof MemberIdRequiredException) +exception instanceof IllegalGenerationException || +exception instanceof RebalanceInProgressException || +exception instanceof MemberIdRequiredException) Review Comment: Should we actually do the timer check before this? Since otherwise if the exception from the immediately returned responses is any of those four, we would still `continue` and skip the check below. More concretely I think we can just move the remaining logic inside the `if` call: ``` if (!future.isRetriable()) { throw .. } else { if (timer.isExpired()) { return false; } else if (exception instance of..) { continue; } else { timer.sleep(..) } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on a diff in pull request #13177: [KAFKA-14441] Benchmark performance impact of metrics library
jeqo commented on code in PR #13177: URL: https://github.com/apache/kafka/pull/13177#discussion_r1096076032 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/metrics/SensorBenchmark.java: ## @@ -0,0 +1,145 @@ +package org.apache.kafka.jmh.common.metrics; + +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Min; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) Review Comment: Sure! I have added SampleTime, and got some interesting stats: ``` Benchmark (numSensors)Mode CntScore Error Units SensorBenchmark.recordBenchmark 1avgt4 122.285 ±3.831 ns/op SensorBenchmark.recordBenchmark 5avgt4 535.055 ± 10.002 ns/op SensorBenchmark.recordBenchmark 20avgt4 2153.720 ± 562.511 ns/op SensorBenchmark.recordWithAvgBenchmark 1avgt4 148.655 ±7.245 ns/op SensorBenchmark.recordWithAvgBenchmark 5avgt4 801.836 ± 25.243 ns/op SensorBenchmark.recordWithAvgBenchmark 20avgt4 5934.782 ± 463.325 ns/op SensorBenchmark.recordWithMaxAvgBenchmark 1avgt4 153.731 ±8.982 ns/op SensorBenchmark.recordWithMaxAvgBenchmark 5avgt4 1067.862 ± 68.656 ns/op SensorBenchmark.recordWithMaxAvgBenchmark 20avgt410486.323 ± 209.097 ns/op SensorBenchmark.recordWithMaxBenchmark 1avgt4 148.492 ± 20.097 ns/op SensorBenchmark.recordWithMaxBenchmark 5avgt4 807.746 ± 300.785 ns/op SensorBenchmark.recordWithMaxBenchmark 20avgt4 6361.096 ± 1160.907 ns/op SensorBenchmark.recordWithMinMaxAvgBenchmark 1avgt4 178.676 ± 12.371 ns/op SensorBenchmark.recordWithMinMaxAvgBenchmark 5avgt4 1749.455 ± 204.069 ns/op SensorBenchmark.recordWithMinMaxAvgBenchmark 20avgt425027.613 ± 1407.740 ns/op SensorBenchmark.recordWithMinMaxBenchmark 1avgt4 158.032 ±9.249 ns/op SensorBenchmark.recordWithMinMaxBenchmark 5avgt4 1001.310 ± 48.258 ns/op SensorBenchmark.recordWithMinMaxBenchmark 20avgt4 8958.896 ± 264.169 ns/op SensorBenchmark.recordWithPercentileBenchmark 1avgt4 133.637 ±7.675 ns/op SensorBenchmark.recordWithPercentileBenchmark 5avgt4 766.876 ± 14.212 ns/op SensorBenchmark.recordWithPercentileBenchmark 20avgt4 5661.163 ± 463.749 ns/op SensorBenchmark.recordWithWindowedSumBenchmark 1avgt4 119.896 ± 13.392 ns/op SensorBenchmark.recordWithWindowedSumBenchmark 5avgt4 663.232 ±9.704
[GitHub] [kafka] jeqo commented on a diff in pull request #13177: [KAFKA-14441] Benchmark performance impact of metrics library
jeqo commented on code in PR #13177: URL: https://github.com/apache/kafka/pull/13177#discussion_r1096074802 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/metrics/SensorBenchmark.java: ## @@ -0,0 +1,145 @@ +package org.apache.kafka.jmh.common.metrics; + +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Min; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) Review Comment: > Unsure how representative the benchmark is for high-throughput invocations in production environments. Yeah, I don't think these are highly representative, but a starting point to compare stats and how adding either sensors or stats affect latency (which is the concern from KIP-864). Though, happy to hear thoughts on how to make these more solid. e.g. I have an idea to also add a benchmark of Yammer metrics to compare the impact of these two libraries. Also, found there was some work around it: - [MetricsBench](https://github.com/apache/kafka/blob/a8aedc85ebfadcf1472acafe2e0311a73d3040be/clients/src/test/java/org/apache/kafka/test/MetricsBench.java): simple average time execution of a sensor + a bunch of stats. - [Microbenchmarks](https://github.com/apache/kafka/blob/00e5803cd3af89011254e734232308618403309d/clients/src/test/java/org/apache/kafka/test/Microbenchmarks.java) which seems more elaborated, but haven't looked into details yet. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on a diff in pull request #13177: [KAFKA-14441] Benchmark performance impact of metrics library
jeqo commented on code in PR #13177: URL: https://github.com/apache/kafka/pull/13177#discussion_r1096069586 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/metrics/SensorBenchmark.java: ## @@ -0,0 +1,145 @@ +package org.apache.kafka.jmh.common.metrics; + +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Min; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) Review Comment: Sure, I reduced it for testing purpose. Increasing to 15, similar to other benchmarks — as I realize it's possible to override these via args. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jeqo commented on a diff in pull request #13177: [KAFKA-14441] Benchmark performance impact of metrics library
jeqo commented on code in PR #13177: URL: https://github.com/apache/kafka/pull/13177#discussion_r1096068517 ## jmh-benchmarks/src/main/java/org/apache/kafka/jmh/common/metrics/SensorBenchmark.java: ## @@ -0,0 +1,145 @@ +package org.apache.kafka.jmh.common.metrics; + +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.metrics.stats.Avg; +import org.apache.kafka.common.metrics.stats.Max; +import org.apache.kafka.common.metrics.stats.Min; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 5) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +public class SensorBenchmark { + +@State(Scope.Benchmark) +public static class SensorWithNoStats { + +Metrics metrics = new Metrics(); + +List sensors = new ArrayList<>(); +@Param({"1", "5", "20"}) Review Comment: Yes, e.g Sink task iteration has around 5 sensor records. Adding a comment to clarify this. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-7735) StateChangeLogMerger tool can not work due to incorrect topic regular matches
[ https://issues.apache.org/jira/browse/KAFKA-7735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Federico Valeri reassigned KAFKA-7735: -- Assignee: Federico Valeri > StateChangeLogMerger tool can not work due to incorrect topic regular matches > - > > Key: KAFKA-7735 > URL: https://issues.apache.org/jira/browse/KAFKA-7735 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 2.0.0 >Reporter: Fangbin Sun >Assignee: Federico Valeri >Priority: Major > > When StateChangeLogMerger tool tries to obtain a topic's state-change-log, it > returns nothing. > {code:java} > bin/kafka-run-class.sh com.cmss.kafka.api.StateChangeLogMerger --logs > state-change.log --topic test{code} > This tool uses a topic partition regex as follows: > {code:java} > val topicPartitionRegex = new Regex("\\[(" + Topic.LEGAL_CHARS + "+),( > )*([0-9]+)\\]"){code} > However the state-change-log no longer prints log in the above format. e.g. > in 0.10.2.0, it prints some state-change logs by case class TopicAndPartition > which overrided as follows: > {code:java} > override def toString = "[%s,%d]".format(topic, partition){code} > In a newer version (e.g. 1.0.0+) it prints most of state-change logs in the > form of "partition $topic-$partition", as a workaround one can modify the > topic partition regex like: > {code:java} > val topicPartitionRegex = new Regex("(partition " + Topic.LEGAL_CHARS + > "+)-([0-9]+)"){code} > and match topic with "matcher.group(1).substring(10)", however some output of > state changes might be a little bit redundant. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] fvaleri commented on pull request #13171: KAFKA-14584: Move StateChangeLogMerger tool
fvaleri commented on PR #13171: URL: https://github.com/apache/kafka/pull/13171#issuecomment-1416167959 @mimaison thanks for spotting this! I saw that ticket, but I started to test with an old log file for testing purpose, and then forgot to also test it on a recent log version. I agree that this is not much used, but maybe the logic can be reused to create something similar. With the latest commit, I make it work with the latest state change log format and addressed your comments. Before merging, we need to rebase with https://github.com/apache/kafka/pull/13195, so that I can also add this tool to the notable changes note. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christo Lolov updated KAFKA-14133: -- Description: {color:#de350b}There are tests which use both PowerMock and EasyMock. I have put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely solely on EasyMock.{color} Unless stated in brackets the tests are in the streams module. A list of tests which still require to be moved from EasyMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}In Review{color} {color:#00875a}Merged{color} # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}RootResourceTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}ByteArrayProducerRecordEquals{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}KStreamFlatTransformTest{color} (owner: Christo) # {color:#00875a}KStreamFlatTransformValuesTest{color} (owner: Christo) # {color:#00875a}KStreamPrintTest{color} (owner: Christo) # {color:#00875a}KStreamRepartitionTest{color} (owner: Christo) # {color:#00875a}MaterializedInternalTest{color} (owner: Christo) # {color:#00875a}TransformerSupplierAdapterTest{color} (owner: Christo) # {color:#00875a}KTableSuppressProcessorMetricsTest{color} (owner: Christo) # {color:#00875a}ClientUtilsTest{color} (owner: Christo) # {color:#00875a}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: Christo) # {color:#00875a}TopologyTest{color} (owner: Christo) # {color:#00875a}KTableSuppressProcessorTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo) # {color:#00875a}MeteredTimestampedWindowStoreTest{color} (owner: Christo) # {color:#00875a}StreamsRebalanceListenerTest{color} (owner: Christo) # {color:#00875a}TimestampedKeyValueStoreMaterializerTest{color} (owner: Christo) # {color:#00875a}CachingInMemoryKeyValueStoreTest{color} (owner: Christo) # {color:#00875a}CachingInMemorySessionStoreTest{color} (owner: Christo) # {color:#00875a}CachingPersistentSessionStoreTest{color} (owner: Christo) # {color:#00875a}CachingPersistentWindowStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo) # {color:#00875a}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: Christo) # {color:#00875a}CompositeReadOnlyWindowStoreTest{color} (owner: Christo) # {color:#00875a}KeyValueStoreBuilderTest{color} (owner: Christo) # {color:#00875a}RocksDBStoreTest{color} (owner: Christo) # {color:#00875a}StreamThreadStateStoreProviderTest{color} (owner: Christo) # {color:#ff8b00}TaskManagerTest{color} (owner: Christo) # {color:#00875a}InternalTopicManagerTest{color} (owner: Christo) # {color:#00875a}ProcessorContextImplTest{color} (owner: Christo) # {color:#00875a}WriteConsistencyVectorTest{color} (owner: Christo) # {color:#00875a}StreamsAssignmentScaleTest{color} (owner: Christo) # {color:#00875a}StreamsPartitionAssignorTest{color} (owner: Christo) # {color:#00875a}AssignmentTestUtils{color} (owner: Christo) # {color:#ff8b00}ProcessorStateManagerTest{color} (owner: Matthew) # {color:#ff8b00}StandbyTaskTest{color} (owner: Matthew) # {color:#ff8b00}StoreChangelogReaderTest{color} (owner: Matthew) # {color:#ff8b00}StreamTaskTest{color} (owner: Matthew) # {color:#ff8b00}StreamThreadTest{color} (owner: Matthew) # {color:#ff8b00}StreamsMetricsImplTest{color} (owner: Dalibor) (Captured in https://issues.apache.org/jira/browse/KAFKA-12947) # {color:#ff8b00}TimeOrderedCachingPersistentWindowStoreTest{color} (owner: [~shekharrajak]) [https://github.com/apache/kafka/pull/12739] # {color:#00875a}TimeOrderedWindowStoreTest{color} (owners: [~shekharrajak]) [https://github.com/apache/kafka/pull/12777] *The coverage report for the above tests after the change should be >= to what the coverage is now.* was: {color:#de350b}There are tests which use both PowerMock and EasyMock. I have put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely solely on EasyMock.{color} Unless stated in brackets the tests are in the streams module. A list of tests which still require to be moved from EasyMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}In Review{color} {color:#00875a}Merged{color} # {color:#00875a}WorkerConnectorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}WorkerCoordinatorTest{color} (connect) (owner: [~yash.mayya] ) # {color:#00875a}RootResourceTest
[GitHub] [kafka] clolov commented on pull request #12777: Replace EasyMock and PowerMock with Mockito - TimeOrderedWindowStoreTest
clolov commented on PR #12777: URL: https://github.com/apache/kafka/pull/12777#issuecomment-1416159492 Great, thank you very much @guozhangwang! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13171: KAFKA-14584: Move StateChangeLogMerger tool
fvaleri commented on code in PR #13171: URL: https://github.com/apache/kafka/pull/13171#discussion_r1096043542 ## tools/src/main/java/org/apache/kafka/tools/StateChangeLogMerger.java: ## @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import joptsimple.OptionSpec; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; +import org.apache.kafka.server.util.ToolsUtils; + +import java.io.File; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.Arrays; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Scanner; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static java.lang.String.format; + +/** + * A utility that merges the state change logs (possibly obtained from different brokers and over multiple days). + * + * This utility expects at least one of the following two arguments - + * 1. A list of state change log files + * 2. A regex to specify state change log file names. + * + * This utility optionally also accepts the following arguments - + * 1. The topic whose state change logs should be merged + * 2. A list of partitions whose state change logs should be merged (can be specified only when the topic argument + * is explicitly specified) + * 3. Start time from when the logs should be merged + * 4. End time until when the logs should be merged + */ +public class StateChangeLogMerger { +private static final String DATE_FORMAT = "-MM-dd HH:mm:ss,SSS"; +private static final Pattern DATE_PATTERN = Pattern.compile("([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}){1}"); +private static final Pattern TOPIC_PART_PATTERN = Pattern.compile("\\[(" + Topic.LEGAL_CHARS + "+),([0-9]+)\\]"); + +public static void main(String[] args) { +try { +StateChangeLogMergerOptions options = new StateChangeLogMergerOptions(args); +if (printHelpOrVersion(options) || !validInput(options)) return; Review Comment: Yes, that was the original intention. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14132) Remaining PowerMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17683961#comment-17683961 ] Hector Geraldino commented on KAFKA-14132: -- Hey [~divijvaidya], I'm putting together a KIP around [https://github.com/apache/kafka/pull/13193|https://github.com/apache/kafka/pull/13193,] in which I'll propose adding new metric counters for filtered (skipped) records. The POC is code complete, but for it to be ready I need to add new test cases on *WorkerSinkTaskTest* and *WorkerSinkTaskThreadedTest* unit tests. These two are assigned to you, and I was wondering if you're OK with me picking them up, this will unblock my other work. I did the same for https://issues.apache.org/jira/browse/KAFKA-14659 (had to raise [https://github.com/apache/kafka/pull/13191] before adding tests for that bug fix) Wdyt? > Remaining PowerMock to Mockito tests > > > Key: KAFKA-14132 > URL: https://issues.apache.org/jira/browse/KAFKA-14132 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > > {color:#de350b}Some of the tests below use EasyMock as well. For those > migrate both PowerMock and EasyMock to Mockito.{color} > Unless stated in brackets the tests are in the connect module. > A list of tests which still require to be moved from PowerMock to Mockito as > of 2nd of August 2022 which do not have a Jira issue and do not have pull > requests I am aware of which are opened: > {color:#ff8b00}InReview{color} > {color:#00875a}Merged{color} > # {color:#00875a}ErrorHandlingTaskTest{color} (owner: [~shekharrajak]) > # {color:#00875a}SourceTaskOffsetCommiterTest{color} (owner: Christo) > # {color:#00875a}WorkerMetricsGroupTest{color} (owner: Divij) > # WorkerSinkTaskTest (owner: Divij) *WIP* > # WorkerSinkTaskThreadedTest (owner: Divij) *WIP* > # {color:#00875a}WorkerTaskTest{color} (owner: [~yash.mayya]) > # {color:#00875a}ErrorReporterTest{color} (owner: [~yash.mayya]) > # {color:#00875a}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya]) > # {color:#00875a}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya]) > # {color:#00875a}ConnectorsResourceTest{color} (owner: [~mdedetrich-aiven]) > # {color:#ff8b00}StandaloneHerderTest{color} (owner: [~mdedetrich-aiven]) > ([https://github.com/apache/kafka/pull/12728]) > # KafkaConfigBackingStoreTest (owner: [~mdedetrich-aiven]) > # {color:#00875a}KafkaOffsetBackingStoreTest{color} (owner: Christo) > ([https://github.com/apache/kafka/pull/12418]) > # {color:#ff8b00}KafkaBasedLogTest{color} (owner: [~mdedetrich-aiven]) > # RetryUtilTest (owner: [~mdedetrich-aiven] ) > # {color:#00875a}RepartitionTopicTest{color} (streams) (owner: Christo) > # {color:#00875a}StateManagerUtilTest{color} (streams) (owner: Christo) > *The coverage report for the above tests after the change should be >= to > what the coverage is now.* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14660) Divide by zero security vulnerability (sonatype-2019-0422)
[ https://issues.apache.org/jira/browse/KAFKA-14660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17683951#comment-17683951 ] Matthias J. Sax edited comment on KAFKA-14660 at 2/3/23 4:52 PM: - Not sure why the PR was not auto-linked... Fixed. [https://github.com/apache/kafka/pull/13175] Thanks for your follow up. Can we close this ticket? Let me know if there is anything else I can do. was (Author: mjsax): Not sure why the PR was not auto-linked... Fixed. [https://github.com/apache/kafka/pull/13175] > Divide by zero security vulnerability (sonatype-2019-0422) > -- > > Key: KAFKA-14660 > URL: https://issues.apache.org/jira/browse/KAFKA-14660 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.2 >Reporter: Andy Coates >Assignee: Matthias J. Sax >Priority: Minor > Fix For: 3.5.0 > > > Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR > and, because the PR was never merged, is now reporting it as a security > vulnerability in the latest Kafka Streams library. > > See: > * [Vulnerability: > sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven&component-name=org.apache.kafka%2Fkafka-streams&utm_source=ossindex-client&utm_medium=integration&utm_content=1.7.0)] > * [Original PR]([https://github.com/apache/kafka/pull/7414]) > > While it looks from the comments made by [~mjsax] and [~bbejeck] that the > divide-by-zero is not really an issue, the fact that its now being reported > as a vulnerability is, especially with regulators. > PITA, but we should consider either getting this vulnerability removed > (Google wasn't very helpful in providing info on how to do this), or fixed > (Again, not sure how to tag the fix as fixing this issue). One option may > just be to reopen the PR and merge (and then fix forward by switching it to > throw an exception). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14660) Divide by zero security vulnerability (sonatype-2019-0422)
[ https://issues.apache.org/jira/browse/KAFKA-14660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17683951#comment-17683951 ] Matthias J. Sax commented on KAFKA-14660: - Not sure why the PR was not auto-linked... Fixed. [https://github.com/apache/kafka/pull/13175] > Divide by zero security vulnerability (sonatype-2019-0422) > -- > > Key: KAFKA-14660 > URL: https://issues.apache.org/jira/browse/KAFKA-14660 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.2 >Reporter: Andy Coates >Assignee: Matthias J. Sax >Priority: Minor > Fix For: 3.5.0 > > > Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR > and, because the PR was never merged, is now reporting it as a security > vulnerability in the latest Kafka Streams library. > > See: > * [Vulnerability: > sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven&component-name=org.apache.kafka%2Fkafka-streams&utm_source=ossindex-client&utm_medium=integration&utm_content=1.7.0)] > * [Original PR]([https://github.com/apache/kafka/pull/7414]) > > While it looks from the comments made by [~mjsax] and [~bbejeck] that the > divide-by-zero is not really an issue, the fact that its now being reported > as a vulnerability is, especially with regulators. > PITA, but we should consider either getting this vulnerability removed > (Google wasn't very helpful in providing info on how to do this), or fixed > (Again, not sure how to tag the fix as fixing this issue). One option may > just be to reopen the PR and merge (and then fix forward by switching it to > throw an exception). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] mimaison commented on a diff in pull request #13171: KAFKA-14584: Move StateChangeLogMerger tool
mimaison commented on code in PR #13171: URL: https://github.com/apache/kafka/pull/13171#discussion_r1094648508 ## tools/src/main/java/org/apache/kafka/tools/StateChangeLogMerger.java: ## @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import joptsimple.OptionSpec; +import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; +import org.apache.kafka.server.util.ToolsUtils; + +import java.io.File; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.io.Serializable; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.Arrays; +import java.util.List; +import java.util.PriorityQueue; +import java.util.Scanner; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static java.lang.String.format; + +/** + * A utility that merges the state change logs (possibly obtained from different brokers and over multiple days). + * + * This utility expects at least one of the following two arguments - + * 1. A list of state change log files + * 2. A regex to specify state change log file names. + * + * This utility optionally also accepts the following arguments - + * 1. The topic whose state change logs should be merged + * 2. A list of partitions whose state change logs should be merged (can be specified only when the topic argument + * is explicitly specified) + * 3. Start time from when the logs should be merged + * 4. End time until when the logs should be merged + */ +public class StateChangeLogMerger { +private static final String DATE_FORMAT = "-MM-dd HH:mm:ss,SSS"; +private static final Pattern DATE_PATTERN = Pattern.compile("([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2},[0-9]{3}){1}"); +private static final Pattern TOPIC_PART_PATTERN = Pattern.compile("\\[(" + Topic.LEGAL_CHARS + "+),([0-9]+)\\]"); + +public static void main(String[] args) { +try { +StateChangeLogMergerOptions options = new StateChangeLogMergerOptions(args); +if (printHelpOrVersion(options) || !validInput(options)) return; Review Comment: What about doing: ``` CommandLineUtils.maybePrintHelpOrVersion(options, "A tool for merging the log files from several brokers to reconstruct a unified history of what happened."); validInput(options); ``` And remove the `printHelpOrVersion()` method and have `validInput()` return `void`? ## tools/src/test/java/org/apache/kafka/tools/StateChangeLogMergerTest.java: ## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.Exit; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java
[jira] [Updated] (KAFKA-14673) Add high watermark listener to Partition/Log layers
[ https://issues.apache.org/jira/browse/KAFKA-14673?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-14673: Summary: Add high watermark listener to Partition/Log layers (was: High watermark listener) > Add high watermark listener to Partition/Log layers > --- > > Key: KAFKA-14673 > URL: https://issues.apache.org/jira/browse/KAFKA-14673 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac opened a new pull request, #13196: KAFKA-14673; Add high watermark listener to Partition/Log layers
dajac opened a new pull request, #13196: URL: https://github.com/apache/kafka/pull/13196 In the context of KIP-848, we implements are new group coordinator in Java. This new coordinator follows the architecture of the new quorum controller. It is basically a replicated state machine that writes to the log and commits its internal state when the writes are committed. At the moment, the only way to know when a write is committed is to use a delayed fetch. This is not ideal in our context because a delayed fetch can be completed before the write is actually committed to the log. This patch proposes to introduce a high watermark listener to the Partition/Log layers. This will allow the new group coordinator to simply listen to changes and commit its state based on this. This mechanism is simpler and lighter as well. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-5756) Synchronization issue on flush
[ https://issues.apache.org/jira/browse/KAFKA-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17683888#comment-17683888 ] Mickael Maison commented on KAFKA-5756: --- As far as I can tell this is still an issue. [~gharris1727] Were you planning to propose a fix? > Synchronization issue on flush > -- > > Key: KAFKA-5756 > URL: https://issues.apache.org/jira/browse/KAFKA-5756 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Oleg Kuznetsov >Priority: Major > Fix For: 0.11.0.1, 1.0.0 > > > Access to *OffsetStorageWriter#toFlush* is not synchronized in *doFlush()* > method, whereas this collection can be accessed from 2 different threads: > - *WorkerSourceTask.execute()*, finally block > - *SourceTaskOffsetCommitter*, from periodic flush task -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14577) Move ConsoleProducer to tools
[ https://issues.apache.org/jira/browse/KAFKA-14577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Alexandre Dupriez reassigned KAFKA-14577: - Assignee: Alexandre Dupriez > Move ConsoleProducer to tools > - > > Key: KAFKA-14577 > URL: https://issues.apache.org/jira/browse/KAFKA-14577 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Alexandre Dupriez >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante merged pull request #13194: MINOR: Update incorrect / misleading comment regarding rebalance exceptions in WorkerSinkTask
C0urante merged PR #13194: URL: https://github.com/apache/kafka/pull/13194 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14577) Move ConsoleProducer to tools
[ https://issues.apache.org/jira/browse/KAFKA-14577?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Omnia Ibrahim reassigned KAFKA-14577: - Assignee: (was: Omnia Ibrahim) > Move ConsoleProducer to tools > - > > Key: KAFKA-14577 > URL: https://issues.apache.org/jira/browse/KAFKA-14577 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14579) Move DumpLogSegments to tools
[ https://issues.apache.org/jira/browse/KAFKA-14579?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Omnia Ibrahim reassigned KAFKA-14579: - Assignee: (was: Omnia Ibrahim) > Move DumpLogSegments to tools > - > > Key: KAFKA-14579 > URL: https://issues.apache.org/jira/browse/KAFKA-14579 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] Hangleton commented on a diff in pull request #11627: KAFKA-13565: add consumer exponential backoff for KIP-580
Hangleton commented on code in PR #11627: URL: https://github.com/apache/kafka/pull/11627#discussion_r1095735143 ## clients/src/main/java/org/apache/kafka/common/utils/ExponentialBackoff.java: ## @@ -32,7 +32,16 @@ private final double expMax; private final long initialInterval; private final double jitter; +private long attemptedCount = 0; Review Comment: Is it guaranteed instances of this class are confined to one thread hence no synchronization is needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on a diff in pull request #13182: KAFKA-14649: Isolate failures during plugin path scanning to single plugin classes
clolov commented on code in PR #13182: URL: https://github.com/apache/kafka/pull/13182#discussion_r1095729055 ## connect/runtime/src/test/resources/test-plugins/fail-to-initialize/test/plugins/OuterClass.java: ## @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.plugins; + +import java.util.List; +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.sink.SinkConnector; + +/** + * Fake plugin class for testing classloading isolation. + * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}. + * Defines a connector as an non-static inner class, which does not have a default constructor. Review Comment: ```suggestion * Defines a connector as a non-static inner class, which does not have a default constructor. ``` ```suggestion * Defines a connector as a non-static inner class, which does not have a default constructor. ``` ## connect/runtime/src/test/resources/test-plugins/fail-to-initialize/test/plugins/CoLocatedPlugin.java: ## @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package test.plugins; + +import java.util.Map; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.storage.Converter; + +/** + * Fake plugin class for testing classloading isolation. + * See {@link org.apache.kafka.connect.runtime.isolation.TestPlugins}. + * This is a plugin co-located with other poorly packaged plugins, but should be visible despite other errors. Review Comment: ```suggestion * This is a plugin co-located with other poorly packaged plugins, but should be visible despite other errors. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.
Hangleton commented on code in PR #13040: URL: https://github.com/apache/kafka/pull/13040#discussion_r1095665461 ## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateManager.java: ## @@ -0,0 +1,671 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.protocol.types.Type; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.Crc32C; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.text.NumberFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Maintains a mapping from ProducerIds to metadata about the last appended entries (e.g. + * epoch, sequence number, last offset, etc.) + * + * The sequence number is the last number successfully appended to the partition for the given identifier. + * The epoch is used for fencing against zombie writers. The offset is the one of the last successful message + * appended to the partition. + * + * As long as a producer id is contained in the map, the corresponding producer can continue to write data. + * However, producer ids can be expired due to lack of recent use or if the last written entry has been deleted from + * the log (e.g. if the retention policy is "delete"). For compacted topics, the log cleaner will ensure + * that the most recent entry from a given producer id is retained in the log provided it hasn't expired due to + * age. This ensures that producer ids will not be expired until either the max expiration time has been reached, + * or if the topic also is configured for deletion, the segment containing the last written offset has + * been deleted. + */ +public class ProducerStateManager { +private static final Logger log = LoggerFactory.getLogger(ProducerStateManager.class.getName()); + +// Remove these once UnifiedLog moves to storage module. +public static final String DELETED_FILE_SUFFIX = ".deleted"; +public static final String PRODUCER_SNAPSHOT_FILE_SUFFIX = ".snapshot"; + +public static final long LATE_TRANSACTION_BUFFER_MS = 5 * 60 * 1000; + +private static final short PRODUCER_SNAPSHOT_VERSION = 1; +private static final String VERSION_FIELD = "version"; +private static final String CRC_FIELD = "crc"; +private static final String PRODUCER_ID_FIELD = "producer_id"; +private static final String LAST_SEQUENCE_FIELD = "last_sequence"; +private static final String PRODUCER_EPOCH_FIELD = "epoch"; +private static final String LAST_OFFSET_FIELD = "last_offset"; +private static final String OFFSET_DELTA_FIELD = "offset_delta"; +private static final String TIMESTAMP_FIELD = "timestamp"; +private static final String PRODUCER_ENTRIES_FIELD = "producer_entries"; +private static final String COORDINATOR_EPOCH_FIELD = "coordinator_epoch"; +private static final String CURRENT_TXN_FIRST_OFFSET_FIELD = "current_txn_first_offset"; + +private static final int VERSION_OFFSET = 0; +private static final int CRC_OFFSET = VERSION_OFFSET + 2; +private static final int PR
[GitHub] [kafka] clolov commented on pull request #13193: KAFKA-14659 source-record-write-[rate|total] metrics include filtered records
clolov commented on PR #13193: URL: https://github.com/apache/kafka/pull/13193#issuecomment-1415746102 Thank you for spotting this behaviour and contributing a fix! The proposed solution makes sense to me, I will just add @C0urante to put the appropriate tags for "connect" to this pull request and review it once it has been rebased. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14582) Move JmxTool to tools
[ https://issues.apache.org/jira/browse/KAFKA-14582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17683838#comment-17683838 ] Federico Valeri commented on KAFKA-14582: - https://github.com/apache/kafka/pull/13195 > Move JmxTool to tools > - > > Key: KAFKA-14582 > URL: https://issues.apache.org/jira/browse/KAFKA-14582 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Federico Valeri >Priority: Major > Fix For: 3.5.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] clolov commented on pull request #13184: KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation
clolov commented on PR #13184: URL: https://github.com/apache/kafka/pull/13184#issuecomment-1415734118 I see what you mean. Okay, I am happy with it. Could you just rename the variable names from transformation to transformationStage as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.
Hangleton commented on code in PR #13040: URL: https://github.com/apache/kafka/pull/13040#discussion_r1095665461 ## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateManager.java: ## @@ -0,0 +1,671 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.protocol.types.Type; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.Crc32C; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.text.NumberFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Maintains a mapping from ProducerIds to metadata about the last appended entries (e.g. + * epoch, sequence number, last offset, etc.) + * + * The sequence number is the last number successfully appended to the partition for the given identifier. + * The epoch is used for fencing against zombie writers. The offset is the one of the last successful message + * appended to the partition. + * + * As long as a producer id is contained in the map, the corresponding producer can continue to write data. + * However, producer ids can be expired due to lack of recent use or if the last written entry has been deleted from + * the log (e.g. if the retention policy is "delete"). For compacted topics, the log cleaner will ensure + * that the most recent entry from a given producer id is retained in the log provided it hasn't expired due to + * age. This ensures that producer ids will not be expired until either the max expiration time has been reached, + * or if the topic also is configured for deletion, the segment containing the last written offset has + * been deleted. + */ +public class ProducerStateManager { +private static final Logger log = LoggerFactory.getLogger(ProducerStateManager.class.getName()); + +// Remove these once UnifiedLog moves to storage module. +public static final String DELETED_FILE_SUFFIX = ".deleted"; +public static final String PRODUCER_SNAPSHOT_FILE_SUFFIX = ".snapshot"; + +public static final long LATE_TRANSACTION_BUFFER_MS = 5 * 60 * 1000; + +private static final short PRODUCER_SNAPSHOT_VERSION = 1; +private static final String VERSION_FIELD = "version"; +private static final String CRC_FIELD = "crc"; +private static final String PRODUCER_ID_FIELD = "producer_id"; +private static final String LAST_SEQUENCE_FIELD = "last_sequence"; +private static final String PRODUCER_EPOCH_FIELD = "epoch"; +private static final String LAST_OFFSET_FIELD = "last_offset"; +private static final String OFFSET_DELTA_FIELD = "offset_delta"; +private static final String TIMESTAMP_FIELD = "timestamp"; +private static final String PRODUCER_ENTRIES_FIELD = "producer_entries"; +private static final String COORDINATOR_EPOCH_FIELD = "coordinator_epoch"; +private static final String CURRENT_TXN_FIRST_OFFSET_FIELD = "current_txn_first_offset"; + +private static final int VERSION_OFFSET = 0; +private static final int CRC_OFFSET = VERSION_OFFSET + 2; +private static final int PR
[jira] [Commented] (KAFKA-14582) Move JmxTool to tools
[ https://issues.apache.org/jira/browse/KAFKA-14582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17683796#comment-17683796 ] Mickael Maison commented on KAFKA-14582: Can you open a small PR to add this to the docs? For each version we have a short Notable Changes section. For example: https://kafka.apache.org/documentation/#upgrade_33_notable > Move JmxTool to tools > - > > Key: KAFKA-14582 > URL: https://issues.apache.org/jira/browse/KAFKA-14582 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Federico Valeri >Priority: Major > Fix For: 3.5.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14593) Move LeaderElectionCommand to tools
[ https://issues.apache.org/jira/browse/KAFKA-14593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17683783#comment-17683783 ] Alexandre Dupriez commented on KAFKA-14593: --- Hello [~omnia_h_ibrahim], Is this JIRA actively worked on? If not, would you be happy to delegate it? Many thanks, Alexandre > Move LeaderElectionCommand to tools > --- > > Key: KAFKA-14593 > URL: https://issues.apache.org/jira/browse/KAFKA-14593 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Omnia Ibrahim >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] nizhikov commented on pull request #13157: KAFKA-14599 Enable javadoc for :connect:mirror module
nizhikov commented on PR #13157: URL: https://github.com/apache/kafka/pull/13157#issuecomment-1415407654 Hello @mimaison Do we still want to fix javadoc for public interfaces? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Hangleton commented on a diff in pull request #13040: KAFKA-14480 Move/Rewrite ProducerStateManager to storage module.
Hangleton commented on code in PR #13040: URL: https://github.com/apache/kafka/pull/13040#discussion_r1095498625 ## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateManager.java: ## @@ -0,0 +1,671 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.SchemaException; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.protocol.types.Type; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.ByteUtils; +import org.apache.kafka.common.utils.Crc32C; +import org.apache.kafka.common.utils.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.text.NumberFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Maintains a mapping from ProducerIds to metadata about the last appended entries (e.g. + * epoch, sequence number, last offset, etc.) + * + * The sequence number is the last number successfully appended to the partition for the given identifier. + * The epoch is used for fencing against zombie writers. The offset is the one of the last successful message + * appended to the partition. + * + * As long as a producer id is contained in the map, the corresponding producer can continue to write data. + * However, producer ids can be expired due to lack of recent use or if the last written entry has been deleted from + * the log (e.g. if the retention policy is "delete"). For compacted topics, the log cleaner will ensure + * that the most recent entry from a given producer id is retained in the log provided it hasn't expired due to + * age. This ensures that producer ids will not be expired until either the max expiration time has been reached, + * or if the topic also is configured for deletion, the segment containing the last written offset has + * been deleted. + */ +public class ProducerStateManager { +private static final Logger log = LoggerFactory.getLogger(ProducerStateManager.class.getName()); + +// Remove these once UnifiedLog moves to storage module. +public static final String DELETED_FILE_SUFFIX = ".deleted"; +public static final String PRODUCER_SNAPSHOT_FILE_SUFFIX = ".snapshot"; + +public static final long LATE_TRANSACTION_BUFFER_MS = 5 * 60 * 1000; + +private static final short PRODUCER_SNAPSHOT_VERSION = 1; +private static final String VERSION_FIELD = "version"; +private static final String CRC_FIELD = "crc"; +private static final String PRODUCER_ID_FIELD = "producer_id"; +private static final String LAST_SEQUENCE_FIELD = "last_sequence"; +private static final String PRODUCER_EPOCH_FIELD = "epoch"; +private static final String LAST_OFFSET_FIELD = "last_offset"; +private static final String OFFSET_DELTA_FIELD = "offset_delta"; +private static final String TIMESTAMP_FIELD = "timestamp"; +private static final String PRODUCER_ENTRIES_FIELD = "producer_entries"; +private static final String COORDINATOR_EPOCH_FIELD = "coordinator_epoch"; +private static final String CURRENT_TXN_FIRST_OFFSET_FIELD = "current_txn_first_offset"; + +private static final int VERSION_OFFSET = 0; +private static final int CRC_OFFSET = VERSION_OFFSET + 2; +private static final int PR