[jira] [Updated] (KAFKA-14930) Public documentation for new Kafka Connect offset management REST APIs
[ https://issues.apache.org/jira/browse/KAFKA-14930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-14930: --- Description: Add public documentation for the new Kafka Connect offset management REST APIs from [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect] being introduced in 3.6: * *PATCH* /connectors/\{connector}/offsets * *DELETE* /connectors/\{connector}/offsets was: Add public documentation for the 3 new Kafka Connect offset management REST APIs being introduced in [KIP-875:|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect] * *PATCH* /connectors/\{connector}/offsets * *DELETE* /connectors/\{connector}/offsets > Public documentation for new Kafka Connect offset management REST APIs > -- > > Key: KAFKA-14930 > URL: https://issues.apache.org/jira/browse/KAFKA-14930 > Project: Kafka > Issue Type: Sub-task > Components: KafkaConnect >Reporter: Mickael Maison >Assignee: Yash Mayya >Priority: Major > Fix For: 3.6.0 > > > Add public documentation for the new Kafka Connect offset management REST > APIs from > [KIP-875|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect] > being introduced in 3.6: > * *PATCH* /connectors/\{connector}/offsets > * *DELETE* /connectors/\{connector}/offsets -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1186812350 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -600,6 +622,176 @@ public String toString() { } } +public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { +int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; +TopicPartition tp = remoteStorageFetchInfo.topicPartition; +FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + +boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + +long offset = fetchInfo.fetchOffset; +int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + +Optional logOptional = fetchLog.apply(tp); +OptionalInt epoch = OptionalInt.empty(); + +if (logOptional.isPresent()) { +Option leaderEpochCache = logOptional.get().leaderEpochCache(); +if (leaderEpochCache.isDefined()) { +epoch = leaderEpochCache.get().epochForOffset(offset); +} +} + +Optional rlsMetadata = epoch.isPresent() +? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) +: Optional.empty(); + +if (!rlsMetadata.isPresent()) { +String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; +throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " ++ epochStr + " and partition " + tp + " which does not exist in remote tier."); +} + +int startPos = lookupPositionForOffset(rlsMetadata.get(), offset); +InputStream remoteSegInputStream = null; +try { +// Search forward for the position of the last offset that is greater than or equal to the target offset +remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(rlsMetadata.get(), startPos); +RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); + +RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); + +if (firstBatch == null) +return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, +includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty()); + +int updatedFetchSize = +remoteStorageFetchInfo.minOneMessage && firstBatch.sizeInBytes() > maxBytes +? firstBatch.sizeInBytes() : maxBytes; + +ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize); +int remainingBytes = updatedFetchSize; + +firstBatch.writeTo(buffer); +remainingBytes -= firstBatch.sizeInBytes(); + +if (remainingBytes > 0) { +// input stream is read till (startPos - 1) while getting the batch of records earlier. +// read the input stream until min of (EOF stream or buffer's remaining capacity). +Utils.readFully(remoteSegInputStream, buffer); +} +buffer.flip(); + +FetchDataInfo fetchDataInfo = new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.readableRecords(buffer)); +if (includeAbortedTxns) { +fetchDataInfo = addAbortedTransactions(firstBatch.baseOffset(), rlsMetadata.get(), fetchDataInfo); +} + +return fetchDataInfo; +} finally { +Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream"); +} +} + +private int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) { +return indexCache.lookupOffset(remoteLogSegmentMetadata, offset); +} + +private FetchDataInfo addAbortedTransactions(long startOffset, + RemoteLogSegmentMetadata segmentMetadata, + FetchDataInfo fetchInfo) throws RemoteStorageException { +int fetchSize = fetchInfo.records.sizeInBytes(); +OffsetPosition startOffsetPosition = new OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset, +fetchInfo.fetchOffsetMetadata.relativePositionInSegment); + +OffsetIndex offsetIndex = indexCache.getIndexEntry(segmentMetadata).offsetIndex(); +long upperBoundOffset = offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize) +.map(x -> x.offset).orElse(segmentMetadata.endOffset() + 1); + +final List abortedTransactions = new ArrayList<>(); + +Consumer> accumulator = +abortedTxns -> abortedTransactions.addAll(abortedTxns.stream() + .map(AbortedTxn
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1186813113 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -600,25 +622,208 @@ public String toString() { } } -long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws RemoteStorageException { -Optional offset = Optional.empty(); -Optional maybeLog = fetchLog.apply(topicIdPartition.topicPartition()); -if (maybeLog.isPresent()) { -UnifiedLog log = maybeLog.get(); -Option maybeLeaderEpochFileCache = log.leaderEpochCache(); -if (maybeLeaderEpochFileCache.isDefined()) { -LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get(); -OptionalInt epoch = cache.latestEpoch(); -while (!offset.isPresent() && epoch.isPresent()) { -offset = remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt()); -epoch = cache.previousEpoch(epoch.getAsInt()); +public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { +int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; +TopicPartition tp = remoteStorageFetchInfo.topicPartition; +FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + +boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + +long offset = fetchInfo.fetchOffset; +int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + +Optional logOptional = fetchLog.apply(tp); +OptionalInt epoch = OptionalInt.empty(); + +if (logOptional.isPresent()) { +Option leaderEpochCache = logOptional.get().leaderEpochCache(); +if (leaderEpochCache.isDefined()) { +epoch = leaderEpochCache.get().epochForOffset(offset); +} +} + +Optional rlsMetadataOptional = epoch.isPresent() +? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) +: Optional.empty(); + +if (!rlsMetadataOptional.isPresent()) { +String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; +throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " ++ epochStr + " and partition " + tp + " which does not exist in remote tier."); +} + +RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); +int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); +InputStream remoteSegInputStream = null; +try { +// Search forward for the position of the last offset that is greater than or equal to the target offset +remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); +RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); + +RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); + +if (firstBatch == null) +return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, +includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty()); + +// An empty record is sent instead of an incomplete batch when there is no minimum-one-message constraint +// and for FetchRequest version 3 and above and the first batch size is more than maximum bytes that can be sent. +int firstBatchSize = firstBatch.sizeInBytes(); +if (!remoteStorageFetchInfo.minOneMessage && +!remoteStorageFetchInfo.hardMaxBytesLimit && +firstBatchSize > maxBytes) { +return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY); +} + +int updatedFetchSize = +remoteStorageFetchInfo.minOneMessage && firstBatchSize > maxBytes ? firstBatchSize : maxBytes; + +ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize); +int remainingBytes = updatedFetchSize; + +firstBatch.writeTo(buffer); +remainingBytes -= firstBatchSize; + +if (remainingBytes > 0) { +// read the input stream until min of (EOF stream or buffer's remaining capacity). +Utils.readFully(remoteSegInputStream, buffer); +} +buffer.flip(); + +FetchDataInfo fetchDataInfo = new FetchDataInfo( +new LogOffsetMetadata(offset, remoteLogSegmentMetadata.startOffset(), startPos), +MemoryRecords.readableRecords(buffer)); +if (includeAbortedTxns) { +
[GitHub] [kafka] dajac commented on a diff in pull request #13638: KAFKA-14462; [11/N] Add CurrentAssignmentBuilder
dajac commented on code in PR #13638: URL: https://github.com/apache/kafka/pull/13638#discussion_r1186823970 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/CurrentAssignmentBuilder.java: ## @@ -0,0 +1,445 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the + * consumer group protocol. Given the current state of a member and a desired or target + * assignment state, the state machine takes the necessary steps to converge them. + * + * The member state has the following properties: + * - Current Epoch: + * The current epoch of the member. + * + * - Next Epoch: + * The desired epoch of the member. It corresponds to the epoch of the target/desired assignment. + * The member transitions to this epoch when it has revoked the partitions that it does not own + * or if it does not have to revoke any. + * + * - Previous Epoch: + * The epoch of the member when the state was last updated. + * + * - Assigned Partitions: + * The set of partitions currently assigned to the member. This represents what the member should have. + * + * - Partitions Pending Revocation: + * The set of partitions that the member should revoke before it can transition to the next state. + * + * - Partitions Pending Assignment: + * The set of partitions that the member will eventually receive. The partitions in this set are + * still owned by other members in the group. + * + * The state machine has four states: + * - NEW_TARGET_ASSIGNMENT: Review Comment: We do but it is a transient state so it is not part of the member's states. We transition to it [here](https://github.com/apache/kafka/pull/13638/files/0395a1780acd73e735bfee21da733075db85504d#diff-6c5e23803064f4b7a122eee29736d036b9cfe244e69f48751ba163d62e2bf35fR176). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14972) Make KafkaConsumer usable in async runtimes
Erik van Oosten created KAFKA-14972: --- Summary: Make KafkaConsumer usable in async runtimes Key: KAFKA-14972 URL: https://issues.apache.org/jira/browse/KAFKA-14972 Project: Kafka Issue Type: Wish Components: consumer Reporter: Erik van Oosten KafkaConsumer contains a check that rejects nested invocations from different threads (method {{{}acquire{}}}). For users that use an async runtime, this is an almost impossible requirement. Examples of async runtimes that are affected are Kotlin co-routines (see KAFKA-7143) and Zio. We propose to replace the thread-id check with an access-id that is stored on a thread-local variable. Existing programs will not be affected. Developers that work in an async runtime can pick up the access-id and set it on the thread-local variable in a thread of their choosing. Every time a callback is invoked a new access-id is generated. When the callback completes, the previous access-id is restored. This proposal does not make it impossible to use the client incorrectly. However, we think it strikes a good balance between making correct usage from an async runtime possible while making incorrect usage difficult. Alternatives considered: # Configuration that switches off the check completely. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] satishd commented on a diff in pull request #13535: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory.
satishd commented on code in PR #13535: URL: https://github.com/apache/kafka/pull/13535#discussion_r1186813113 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -600,25 +622,208 @@ public String toString() { } } -long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws RemoteStorageException { -Optional offset = Optional.empty(); -Optional maybeLog = fetchLog.apply(topicIdPartition.topicPartition()); -if (maybeLog.isPresent()) { -UnifiedLog log = maybeLog.get(); -Option maybeLeaderEpochFileCache = log.leaderEpochCache(); -if (maybeLeaderEpochFileCache.isDefined()) { -LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get(); -OptionalInt epoch = cache.latestEpoch(); -while (!offset.isPresent() && epoch.isPresent()) { -offset = remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt()); -epoch = cache.previousEpoch(epoch.getAsInt()); +public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException { +int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes; +TopicPartition tp = remoteStorageFetchInfo.topicPartition; +FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo; + +boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED; + +long offset = fetchInfo.fetchOffset; +int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes); + +Optional logOptional = fetchLog.apply(tp); +OptionalInt epoch = OptionalInt.empty(); + +if (logOptional.isPresent()) { +Option leaderEpochCache = logOptional.get().leaderEpochCache(); +if (leaderEpochCache.isDefined()) { +epoch = leaderEpochCache.get().epochForOffset(offset); +} +} + +Optional rlsMetadataOptional = epoch.isPresent() +? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset) +: Optional.empty(); + +if (!rlsMetadataOptional.isPresent()) { +String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE"; +throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch " ++ epochStr + " and partition " + tp + " which does not exist in remote tier."); +} + +RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get(); +int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset); +InputStream remoteSegInputStream = null; +try { +// Search forward for the position of the last offset that is greater than or equal to the target offset +remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos); +RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream); + +RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset); + +if (firstBatch == null) +return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, +includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty()); + +// An empty record is sent instead of an incomplete batch when there is no minimum-one-message constraint +// and for FetchRequest version 3 and above and the first batch size is more than maximum bytes that can be sent. +int firstBatchSize = firstBatch.sizeInBytes(); +if (!remoteStorageFetchInfo.minOneMessage && +!remoteStorageFetchInfo.hardMaxBytesLimit && +firstBatchSize > maxBytes) { +return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY); +} + +int updatedFetchSize = +remoteStorageFetchInfo.minOneMessage && firstBatchSize > maxBytes ? firstBatchSize : maxBytes; + +ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize); +int remainingBytes = updatedFetchSize; + +firstBatch.writeTo(buffer); +remainingBytes -= firstBatchSize; + +if (remainingBytes > 0) { +// read the input stream until min of (EOF stream or buffer's remaining capacity). +Utils.readFully(remoteSegInputStream, buffer); +} +buffer.flip(); + +FetchDataInfo fetchDataInfo = new FetchDataInfo( +new LogOffsetMetadata(offset, remoteLogSegmentMetadata.startOffset(), startPos), +MemoryRecords.readableRecords(buffer)); +if (includeAbortedTxns) { +
[GitHub] [kafka] mehbey opened a new pull request, #13681: KAFKA-14133: Migrate ActiveTaskCreator mock in TaskManagerTest to Mockito
mehbey opened a new pull request, #13681: URL: https://github.com/apache/kafka/pull/13681 This pull requests migrates the ActiveTaskCreator mock in TaskManagerTest from EasyMock to Mockito The change is restricted to a single mock to minimize the scope and make it easier for review. Please see two examples that follows the same pattern below: https://github.com/apache/kafka/pull/13529 https://github.com/apache/kafka/pull/13621 ### Summary of testing strategy (including rationale) Verified that all tests are passing -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13669: MINOR: Fix producer Callback comment
showuon commented on PR #13669: URL: https://github.com/apache/kafka/pull/13669#issuecomment-1537641986 CI build failed with infra issue. Rebuilding https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13669/3/ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes
showuon commented on PR #13515: URL: https://github.com/apache/kafka/pull/13515#issuecomment-1537643776 Failed tests are unrelated: ``` Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationTransactionsTest.testOffsetTranslationBehindReplicationFlow() Build / JDK 8 and Scala 2.12 / integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor() Build / JDK 17 and Scala 2.13 / kafka.api.TransactionsTest.testFailureToFenceEpoch(String).quorum=kraft Build / JDK 17 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testDelayedConfigurationOperations() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #13515: KAFKA-14752: Kafka examples improvements - producer changes
showuon merged PR #13515: URL: https://github.com/apache/kafka/pull/13515 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14914) binarySearch in AbstactIndex may execute with infinite loop
[ https://issues.apache.org/jira/browse/KAFKA-14914?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17720372#comment-17720372 ] li xiangyuan commented on KAFKA-14914: -- After create this jira ticket we had encountered this problem again, we try to fetch the index file but failed, for now we downgrade our aws ec2 instance to c6 and haven't met this. we are keeping tracking this. Luke Chen (Jira) 于2023年5月5日周五 09:46写道: > binarySearch in AbstactIndex may execute with infinite loop > --- > > Key: KAFKA-14914 > URL: https://issues.apache.org/jira/browse/KAFKA-14914 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.4.0 >Reporter: li xiangyuan >Priority: Major > Attachments: stack.1.txt, stack.2.txt, stack.3.txt > > > Recently our servers in production environment may suddenly stop handle > request frequently(for now 3 times in less than 10 days), please check the > stack file uploaded, it show that 1 > ioThread(data-plane-kafka-request-handler-11) hold the ReadLock of > Partition's leaderIsrUpdateLock and keep run the binarySearch function, once > any thread(kafka-scheduler-2) need WriteMode Of this lock, then all requests > read this partition need ReadMode Lock will use out all ioThreads and then > this broker couldn't handle any request. > the 3 stack files are fetched with interval about 6 minute, with my > standpoint i just could think obviously the binarySearch function cause dead > lock and I presuppose maybe the index block values in offsetIndex (at least > in mmap) are not sorted. > > detail information: > this problem appear in 2 brokers > broker version: 2.4.0 > jvm: openjdk 11 > hardware: aws c7g 4xlarge, this is a arm64 server, we recently upgrade our > servers from c6g 4xlarge to this type, when we use c6g haven't meet this > problem, we don't know whether arm or aws c7g server have any problem. > other: once we restart broker, it will recover, so we doubt offset index file > may not corrupted and maybe something wrong with mmap. > plz give any suggestion solve this problem, thx. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ableegoldman opened a new pull request, #13682: MINOR: improve error messages for join window store settings
ableegoldman opened a new pull request, #13682: URL: https://github.com/apache/kafka/pull/13682 Trying to configure the underlying state stores in a join is pretty awkward, and getting all the parameters right can be particularly frustrating. At a minimum, we should break up the verification into each individual check so that it's clear which step has failed. It would also help to actually include in the error message how to fix the store parameters to match what is expected, since this is not necessarily obvious (eg that retention period should be **exactly** the sum of windowSize + gracePeriod) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
showuon commented on code in PR #13135: URL: https://github.com/apache/kafka/pull/13135#discussion_r1185891570 ## clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java: ## @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import java.io.BufferedInputStream; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + * ChunkedBytesStream is a copy of {@link ByteBufferInputStream} with the following differences: + * - Unlike {@link java.io.BufferedInputStream#skip(long)} this class could be configured to not push skip() to + * input stream. We may want to avoid pushing this to input stream because it's implementation maybe inefficient, + * e.g. the case of ZstdInputStream which allocates a new buffer from buffer pool, per skip call. + * - Unlike {@link java.io.BufferedInputStream}, which allocates an intermediate buffer, this uses a buffer supplier to + * create the intermediate buffer. + * + * Note that: + * - this class is not thread safe and shouldn't be used in scenarios where multiple threads access this. + * - the implementation of this class is performance sensitive. Minor changes such as usage of ByteBuffer instead of byte[] + * can significantly impact performance, hence, proceed with caution. + */ +public class ChunkedBytesStream extends FilterInputStream { +/** + * Supplies the ByteBuffer which is used as intermediate buffer to store the chunk of output data. + */ +private final BufferSupplier bufferSupplier; +/** + * Intermediate buffer to store the chunk of output data. The ChunkedBytesStream is considered closed if + * this buffer is null. + */ +private byte[] intermediateBuf; +/** + * The index one greater than the index of the last valid byte in + * the buffer. + * This value is always in the range 0 through intermediateBuf.length; + * elements intermediateBuf[0] through intermediateBuf[count-1] + * contain buffered input data obtained + * from the underlying input stream. + */ +protected int count = 0; +/** + * The current position in the buffer. This is the index of the next + * character to be read from the buf array. + * + * This value is always in the range 0 + * through count. If it is less + * than count, then intermediateBuf[pos] + * is the next byte to be supplied as input; + * if it is equal to count, then + * the next read or skip + * operation will require more bytes to be + * read from the contained input stream. + */ +protected int pos = 0; +/** + * Reference for the intermediate buffer. This reference is only kept for releasing the buffer from the + * buffer supplier. + */ +private final ByteBuffer intermediateBufRef; +/** + * Determines if the skip be pushed down + */ +private final boolean pushSkipToSourceStream; + +public ChunkedBytesStream(InputStream in, BufferSupplier bufferSupplier, int intermediateBufSize, boolean pushSkipToSourceStream) { +super(in); +this.bufferSupplier = bufferSupplier; +intermediateBufRef = bufferSupplier.get(intermediateBufSize); +if (!intermediateBufRef.hasArray() || (intermediateBufRef.arrayOffset() != 0)) { +throw new IllegalArgumentException("provided ByteBuffer lacks array or has non-zero arrayOffset"); +} +intermediateBuf = intermediateBufRef.array(); +this.pushSkipToSourceStream = pushSkipToSourceStream; +} + +/** + * Check to make sure that buffer has not been nulled out due to + * close; if not return it; + */ +private byte[] getBufIfOpen() throws IOException { +byte[] buffer = intermediateBuf; +if (buffer == null) +throw new IOException("Stream closed"); +return buffer; +} + +/** + * See Review Comment: See ? ## clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java: ## @@ -356,164 +346,100 @@ private static DefaultRecord readFrom(ByteBuffer buffer,
[GitHub] [kafka] showuon commented on a diff in pull request #13135: KAFKA-14633: Reduce data copy & buffer allocation during decompression
showuon commented on code in PR #13135: URL: https://github.com/apache/kafka/pull/13135#discussion_r1187071361 ## clients/src/main/java/org/apache/kafka/common/utils/ChunkedBytesStream.java: ## @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import java.io.BufferedInputStream; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + * ChunkedBytesStream is a copy of {@link ByteBufferInputStream} with the following differences: + * - Unlike {@link java.io.BufferedInputStream#skip(long)} this class could be configured to not push skip() to + * input stream. We may want to avoid pushing this to input stream because it's implementation maybe inefficient, + * e.g. the case of ZstdInputStream which allocates a new buffer from buffer pool, per skip call. + * - Unlike {@link java.io.BufferedInputStream}, which allocates an intermediate buffer, this uses a buffer supplier to Review Comment: Should we also mention we don't have `mark` as in `BufferedInputStream`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #13669: MINOR: Fix producer Callback comment
showuon merged PR #13669: URL: https://github.com/apache/kafka/pull/13669 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13672: MINOR: Print the cause of failure for PlaintextAdminIntegrationTest
showuon commented on PR #13672: URL: https://github.com/apache/kafka/pull/13672#issuecomment-1537829500 Failed tests are unrelated: ``` Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest.testReplicateSourceDefault() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #13672: MINOR: Print the cause of failure for PlaintextAdminIntegrationTest
showuon merged PR #13672: URL: https://github.com/apache/kafka/pull/13672 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #13660: KAFKA-14662: Update the ACL list in the doc
showuon commented on PR #13660: URL: https://github.com/apache/kafka/pull/13660#issuecomment-1537831054 Failed tests are unrelated: ``` Build / JDK 8 and Scala 2.12 / integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor() Build / JDK 11 and Scala 2.13 / org.apache.kafka.tools.MetadataQuorumCommandTest.[1] Type=Raft-Combined, Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.5-IV2, Security=PLAINTEXT Build / JDK 11 and Scala 2.13 / org.apache.kafka.tools.MetadataQuorumCommandTest.[2] Type=Raft-Isolated, Name=testDescribeQuorumReplicationSuccessful, MetadataVersion=3.5-IV2, Security=PLAINTEXT ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon merged pull request #13660: KAFKA-14662: Update the ACL list in the doc
showuon merged PR #13660: URL: https://github.com/apache/kafka/pull/13660 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14662) ACL listings in documentation are out of date
[ https://issues.apache.org/jira/browse/KAFKA-14662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen resolved KAFKA-14662. --- Fix Version/s: 3.6.0 Resolution: Fixed > ACL listings in documentation are out of date > - > > Key: KAFKA-14662 > URL: https://issues.apache.org/jira/browse/KAFKA-14662 > Project: Kafka > Issue Type: Bug > Components: core, docs >Reporter: Mickael Maison >Assignee: Gantigmaa Selenge >Priority: Major > Fix For: 3.6.0 > > > ACLs listed in > https://kafka.apache.org/documentation/#operations_resources_and_protocols > are out of date. They only cover API keys up to 47 (OffsetDelete) and don't > include DescribeClientQuotas, AlterClientQuotas, > DescribeUserScramCredentials, AlterUserScramCredentials, DescribeQuorum, > AlterPartition, UpdateFeatures, DescribeCluster, DescribeProducers, > UnregisterBroker, DescribeTransactions, ListTransactions, AllocateProducerIds. > This is hard to keep up to date so we should consider whether this could be > generated automatically. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] showuon opened a new pull request, #13683: MINOR: 3.3 test
showuon opened a new pull request, #13683: URL: https://github.com/apache/kafka/pull/13683 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon closed pull request #13683: MINOR: 3.3 test
showuon closed pull request #13683: MINOR: 3.3 test URL: https://github.com/apache/kafka/pull/13683 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org