[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514882473 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), request.version()) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version()) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version()) + case res: DescribeUserScramCredentialsRequest =>
[GitHub] [kafka] ning2008wisc commented on pull request #9224: KAFKA-10304: refactor MM2 integration tests
ning2008wisc commented on pull request #9224: URL: https://github.com/apache/kafka/pull/9224#issuecomment-719201216 A kind reminder for additional feedback and comments, based on my response above This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514872477 ## File path: core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala ## @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.util.HashMap + +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message._ +import org.junit.Test +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.record.MemoryRecords +import org.apache.kafka.common.requests._ + +import scala.collection.mutable.ArrayBuffer + +class RequestConvertToJsonTest { + + def createRequestsFromApiKey(apiKey: ApiKeys, version: Short): AbstractRequest = apiKey match { +case ApiKeys.PRODUCE => ProduceRequest.Builder.forCurrentMagic(0.toShort, 1, new HashMap[TopicPartition, MemoryRecords]()).build() +case ApiKeys.FETCH => new FetchRequest(new FetchRequestData(), version) +case ApiKeys.LIST_OFFSETS => new ListOffsetRequest(new ListOffsetRequestData().toStruct(version), version) +case ApiKeys.METADATA => new MetadataRequest(new MetadataRequestData(), version) +case ApiKeys.OFFSET_COMMIT => new OffsetCommitRequest(new OffsetCommitRequestData(), version) +case ApiKeys.OFFSET_FETCH => new OffsetFetchRequest(new OffsetFetchRequestData().toStruct(version), version) +case ApiKeys.FIND_COORDINATOR => new FindCoordinatorRequest(new FindCoordinatorRequestData().toStruct(version), version) +case ApiKeys.JOIN_GROUP => new JoinGroupRequest(new JoinGroupRequestData(), version) +case ApiKeys.HEARTBEAT => new HeartbeatRequest(new HeartbeatRequestData().toStruct(version), version) +case ApiKeys.LEAVE_GROUP => new LeaveGroupRequest(new LeaveGroupRequestData().toStruct(version), version) Review comment: Actually, I found a workaround for this mapping and removed it all together, so there'd be one less apikey mapping to maintain. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514619891 ## File path: core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala ## @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.util.HashMap + +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message._ +import org.junit.Test +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.record.MemoryRecords +import org.apache.kafka.common.requests._ + +import scala.collection.mutable.ArrayBuffer + +class RequestConvertToJsonTest { + + def createRequestsFromApiKey(apiKey: ApiKeys, version: Short): AbstractRequest = apiKey match { +case ApiKeys.PRODUCE => ProduceRequest.Builder.forCurrentMagic(0.toShort, 1, new HashMap[TopicPartition, MemoryRecords]()).build() +case ApiKeys.FETCH => new FetchRequest(new FetchRequestData(), version) +case ApiKeys.LIST_OFFSETS => new ListOffsetRequest(new ListOffsetRequestData().toStruct(version), version) +case ApiKeys.METADATA => new MetadataRequest(new MetadataRequestData(), version) +case ApiKeys.OFFSET_COMMIT => new OffsetCommitRequest(new OffsetCommitRequestData(), version) +case ApiKeys.OFFSET_FETCH => new OffsetFetchRequest(new OffsetFetchRequestData().toStruct(version), version) +case ApiKeys.FIND_COORDINATOR => new FindCoordinatorRequest(new FindCoordinatorRequestData().toStruct(version), version) +case ApiKeys.JOIN_GROUP => new JoinGroupRequest(new JoinGroupRequestData(), version) +case ApiKeys.HEARTBEAT => new HeartbeatRequest(new HeartbeatRequestData().toStruct(version), version) +case ApiKeys.LEAVE_GROUP => new LeaveGroupRequest(new LeaveGroupRequestData().toStruct(version), version) Review comment: I guess this is related with the inconsistency issue, but I tried not to make too many changes. The constructors with the `data` parameters were private in some cases, so I used the `toStruct`. But for consistency, I'll change to use `toStruct` instead. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514619891 ## File path: core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala ## @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.util.HashMap + +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message._ +import org.junit.Test +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.record.MemoryRecords +import org.apache.kafka.common.requests._ + +import scala.collection.mutable.ArrayBuffer + +class RequestConvertToJsonTest { + + def createRequestsFromApiKey(apiKey: ApiKeys, version: Short): AbstractRequest = apiKey match { +case ApiKeys.PRODUCE => ProduceRequest.Builder.forCurrentMagic(0.toShort, 1, new HashMap[TopicPartition, MemoryRecords]()).build() +case ApiKeys.FETCH => new FetchRequest(new FetchRequestData(), version) +case ApiKeys.LIST_OFFSETS => new ListOffsetRequest(new ListOffsetRequestData().toStruct(version), version) +case ApiKeys.METADATA => new MetadataRequest(new MetadataRequestData(), version) +case ApiKeys.OFFSET_COMMIT => new OffsetCommitRequest(new OffsetCommitRequestData(), version) +case ApiKeys.OFFSET_FETCH => new OffsetFetchRequest(new OffsetFetchRequestData().toStruct(version), version) +case ApiKeys.FIND_COORDINATOR => new FindCoordinatorRequest(new FindCoordinatorRequestData().toStruct(version), version) +case ApiKeys.JOIN_GROUP => new JoinGroupRequest(new JoinGroupRequestData(), version) +case ApiKeys.HEARTBEAT => new HeartbeatRequest(new HeartbeatRequestData().toStruct(version), version) +case ApiKeys.LEAVE_GROUP => new LeaveGroupRequest(new LeaveGroupRequestData().toStruct(version), version) Review comment: I guess this is related with the inconsistency issue, but I tried not to make too many changes. The constructors with the `data` parameters were private in some cases, so I used the `toStruct`. But for consistency, I'll change to use the Builder class instead. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514861991 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), request.version()) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version()) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version()) + case res: DescribeUserScramCredentialsRequest =>
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514861389 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), request.version()) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version()) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version()) + case res: DescribeUserScramCredentialsRequest =>
[GitHub] [kafka] chia7712 commented on a change in pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol
chia7712 commented on a change in pull request #9401: URL: https://github.com/apache/kafka/pull/9401#discussion_r514819326 ## File path: clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java ## @@ -210,65 +142,42 @@ public String toString() { } } +/** + * We have to copy acks, timeout, transactionalId and partitionSizes from data since data maybe reset to eliminate + * the reference to ByteBuffer but those metadata are still useful. + */ private final short acks; private final int timeout; private final String transactionalId; - -private final Map partitionSizes; - +// visible for testing +final Map partitionSizes; +private boolean hasTransactionalRecords = false; +private boolean hasIdempotentRecords = false; // This is set to null by `clearPartitionRecords` to prevent unnecessary memory retention when a produce request is // put in the purgatory (due to client throttling, it can take a while before the response is sent). // Care should be taken in methods that use this field. -private volatile Map partitionRecords; -private boolean hasTransactionalRecords = false; -private boolean hasIdempotentRecords = false; - -private ProduceRequest(short version, short acks, int timeout, Map partitionRecords, String transactionalId) { -super(ApiKeys.PRODUCE, version); -this.acks = acks; -this.timeout = timeout; - -this.transactionalId = transactionalId; -this.partitionRecords = partitionRecords; -this.partitionSizes = createPartitionSizes(partitionRecords); +private volatile ProduceRequestData data; -for (MemoryRecords records : partitionRecords.values()) { -setFlags(records); -} -} - -private static Map createPartitionSizes(Map partitionRecords) { -Map result = new HashMap<>(partitionRecords.size()); -for (Map.Entry entry : partitionRecords.entrySet()) -result.put(entry.getKey(), entry.getValue().sizeInBytes()); -return result; -} - -public ProduceRequest(Struct struct, short version) { +public ProduceRequest(ProduceRequestData produceRequestData, short version) { super(ApiKeys.PRODUCE, version); -partitionRecords = new HashMap<>(); -for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) { -Struct topicData = (Struct) topicDataObj; -String topic = topicData.get(TOPIC_NAME); -for (Object partitionResponseObj : topicData.getArray(PARTITION_DATA_KEY_NAME)) { -Struct partitionResponse = (Struct) partitionResponseObj; -int partition = partitionResponse.get(PARTITION_ID); -MemoryRecords records = (MemoryRecords) partitionResponse.getRecords(RECORD_SET_KEY_NAME); -setFlags(records); -partitionRecords.put(new TopicPartition(topic, partition), records); -} -} -partitionSizes = createPartitionSizes(partitionRecords); -acks = struct.getShort(ACKS_KEY_NAME); -timeout = struct.getInt(TIMEOUT_KEY_NAME); -transactionalId = struct.getOrElse(NULLABLE_TRANSACTIONAL_ID, null); -} - -private void setFlags(MemoryRecords records) { -Iterator iterator = records.batches().iterator(); -MutableRecordBatch entry = iterator.next(); -hasIdempotentRecords = hasIdempotentRecords || entry.hasProducerId(); -hasTransactionalRecords = hasTransactionalRecords || entry.isTransactional(); +this.data = produceRequestData; +this.data.topicData().forEach(topicProduceData -> topicProduceData.partitions() +.forEach(partitionProduceData -> { +MemoryRecords records = MemoryRecords.readableRecords(partitionProduceData.records()); +Iterator iterator = records.batches().iterator(); +MutableRecordBatch entry = iterator.next(); +hasIdempotentRecords = hasIdempotentRecords || entry.hasProducerId(); Review comment: clients module has some tests which depends on it so I moves the helper to ```RequestUtils```. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming opened a new pull request #9535: MINOR: remove redundant return statement
dengziming opened a new pull request #9535: URL: https://github.com/apache/kafka/pull/9535 the result of `GroupMetadataManager.storeOffsets` is Unit, so remove the redundant return statement This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-6217) Allow consumers to read messages from LEO
[ https://issues.apache.org/jira/browse/KAFKA-6217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ashok Bala updated KAFKA-6217: -- Description: we have a use case(real time customer facing application) to support high throughput(reduced latency) & high volume of transactions through Kafka. During performance testing, we noted that consumer listening to single replicated partition topics get messages faster than the consumers listening to replicated partitions. Make messages to be visible immediately to consumer rather than waiting for high water mark. Make change in consumer & core, based on consumer configuration, LEO messages to be visible to consumer like followers. It is the risk of consumer to lose messages during fail over of leader for which we are OK. was: we have a use case(real time customer facing application) to support high throughput(reduced latency) & high volume of transactions through Kafka. During performance testing, we noted that consumer listening to single partition topics get messages faster than the consumers listening to replicated partitions. Make messages to be visible immediately to consumer rather than waiting for high water mark. Make change in consumer & core, based on consumer configuration, LEO messages to be visible to consumer like followers. It is the risk of consumer to lose messages during fail over of leader for which we are OK. > Allow consumers to read messages from LEO > - > > Key: KAFKA-6217 > URL: https://issues.apache.org/jira/browse/KAFKA-6217 > Project: Kafka > Issue Type: Improvement > Components: consumer, core >Reporter: Ashok Bala >Priority: Major > > we have a use case(real time customer facing application) to support high > throughput(reduced latency) & high volume of transactions through Kafka. > During performance testing, we noted that consumer listening to single > replicated partition topics get messages faster than the consumers listening > to replicated partitions. > Make messages to be visible immediately to consumer rather than waiting for > high water mark. Make change in consumer & core, based on consumer > configuration, LEO messages to be visible to consumer like followers. It is > the risk of consumer to lose messages during fail over of leader for which we > are OK. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman opened a new pull request #9534: KAFKA-10664: Delete existing checkpoint when writing empty offsets
ableegoldman opened a new pull request #9534: URL: https://github.com/apache/kafka/pull/9534 ...otherwise we can get stuck in an endless loop of initializing corrupted offsets, hitting OffsetOutOfRangeException and closing the task, then reviving the task with those same corrupted offsets. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers
[ https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17223324#comment-17223324 ] Matthias J. Sax commented on KAFKA-10635: - \cc [~hachikuji] [~bob-barrett] > Streams application fails with OutOfOrderSequenceException after rolling > restarts of brokers > > > Key: KAFKA-10635 > URL: https://issues.apache.org/jira/browse/KAFKA-10635 > Project: Kafka > Issue Type: Bug > Components: core, producer >Affects Versions: 2.5.1 >Reporter: Peeraya Maetasatidsuk >Priority: Blocker > > We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a > rolling restart of the brokers after installing the new version. After the > restarts we notice one of our streams app (client version 2.4.1) fails with > OutOfOrderSequenceException: > > {code:java} > ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected > error. Record: a_record, destination topic: > topic-name-Aggregation-repartition > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > ERROR [2020-10-13 22:52:21,413] > [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread > [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the > following error: org.apache.kafka.streams.errors.StreamsException: task > [1_39] Abort sending since an error caught with a previous record (timestamp > 1602654659000) to topic topic-name-Aggregation-repartition due to > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number.at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:204) > at > org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730) > at > org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716) > at > org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674) > at > org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596) > at > org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) > at > org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798) > at > org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109) > at > org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569) > at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) > at java.base/java.lang.Thread.run(Thread.java:834)Caused by: > org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker > received an out of order sequence number. > {code} > We see a corresponding error on the broker side: > {code:java} > [2020-10-13 22:52:21,398] ERROR [ReplicaManager broker=137636348] Error > processing append operation on partition > topic-name-Aggregation-repartition-52 > (kafka.server.ReplicaManager)org.apache.kafka.common.errors.OutOfOrderSequenceException: > Out of order sequence number for producerId 2819098 at offset 1156041 in > partition topic-name-Aggregation-repartition-52: 29 (incoming seq. number), > -1 (current end sequence number) > {code} > We are able to reproduce this many times and it happens regardless of whether > the broker shutdown (at restart) is clean or unclean. However, when we > rollback the broker version to 2.3.1 from 2.5.1 and perform similar rolling > restarts, we don't see this error on the streams application at all. This is > blocking us from upgrading our broker version. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10664) Streams fails to overwrite corrupted offsets leading to infinite OffsetOutOfRangeException loop
A. Sophie Blee-Goldman created KAFKA-10664: -- Summary: Streams fails to overwrite corrupted offsets leading to infinite OffsetOutOfRangeException loop Key: KAFKA-10664 URL: https://issues.apache.org/jira/browse/KAFKA-10664 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.7.0 Reporter: A. Sophie Blee-Goldman Assignee: A. Sophie Blee-Goldman Fix For: 2.7.0 In KAFKA-10391 we fixed an issue where Streams could get stuck in an infinite loop of OffsetOutOfRangeException/TaskCorruptedException due to re-initializing the corrupted offsets from the checkpoint after each revival. The fix we applied was to remove the corrupted offsets from the state manager and then force it to write a new checkpoint file without those offsets during revival. Unfortunately we missed that there's an optimization in OffsetCheckpoint#write to just return without writing anything when there's no offsets. So if a task doesn't have any offsets that _aren't_ corrupted, it will skip overwriting the corrupted checkpoint. Probably we should just fix the optimization in OffsetCheckpoint so that it deletes the current checkpoint in the case there are no offsets to write -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10645) Forwarding a record from a punctuator sometimes it results in a NullPointerException
[ https://issues.apache.org/jira/browse/KAFKA-10645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17222393#comment-17222393 ] Matthias J. Sax edited comment on KAFKA-10645 at 10/30/20, 1:31 AM: [~showuon] While my PR adds an additional check, it is still unclear how a `null` header was added in the first place? {quote}by passing the headers Array/Iterable to the constructor. {quote} Yes, but if the input records does not have any headers, the array should have zero entries? It's still unclear to me, where a `null` entry comes from? [~filmac79] I have no idea atm what could cause the `ArrayIndexOutOfBoundsException` atm. In L67 we call `headers.add()` that should add the header to the end of the list. It seem that the `ArrayList.add` does something funny? There should not be any synchronization issue, as the punctuate() and process() are called by the same thread? Well, there was some other issue with regard to concurrency though: https://issues.apache.org/jira/browse/KAFKA-9584 – maybe this is the root cause but you just hit a different exception? was (Author: mjsax): [~showuon] While my PR adds an additional check, it is still unclear how a `null` header was added in the first place? {quote}by passing the headers Array/Iterable to the constructor. {quote} Yes, but if the input records does not have any headers, the array should have zero entries? It's still unclear to me, there a `null` entry comes from? [~filmac79] I have no idea atm what could cause the `ArrayIndexOutOfBoundsException` atm. In L67 we call `headers.add()` that should add the header to the end of the list. It seem that the `ArrayList.add` does something funny? There should not be any synchronization issue, as the punctuate() and process() are called by the same thread? Well, there was some other issue with regard to concurrency though: https://issues.apache.org/jira/browse/KAFKA-9584 – maybe this is the root cause but you just hit a different exception? > Forwarding a record from a punctuator sometimes it results in a > NullPointerException > > > Key: KAFKA-10645 > URL: https://issues.apache.org/jira/browse/KAFKA-10645 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.5.0 >Reporter: Filippo Machi >Assignee: Matthias J. Sax >Priority: Major > > Hello, > I am working on a java kafka stream application (v. 2.5.0) running on a > kubernetes cluster. > It´s a springboot application running with java 8. > With the last upgrade to version 2.5.0 I started to see into the logs some > NullPointerException that are happening when forwarding a record from a > punctuator. > This is the stacktrace of the exception > {code:java} > Caused by: org.apache.kafka.streams.errors.StreamsException: task [2_2] Abort > sending since an error caught with a previous record (timestamp > 1603721062667) to topic reply-reminder-push-sender due to > java.lang.NullPointerException\tat > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:240)\tat > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:111)\tat > > org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)\tat > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)\tat > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)\tat > > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)\t... > 24 common frames omittedCaused by: java.lang.NullPointerException: null\tat > org.apache.kafka.common.record.DefaultRecord.sizeOf(DefaultRecord.java:613)\tat > > org.apache.kafka.common.record.DefaultRecord.recordSizeUpperBound(DefaultRecord.java:633)\tat > > org.apache.kafka.common.record.DefaultRecordBatch.estimateBatchSizeUpperBound(DefaultRecordBatch.java:534)\tat > > org.apache.kafka.common.record.AbstractRecords.estimateSizeInBytesUpperBound(AbstractRecords.java:135)\tat > > org.apache.kafka.common.record.AbstractRecords.estimateSizeInBytesUpperBound(AbstractRecords.java:125)\tat > > org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:914)\tat > > org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862)\tat > > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:181)\t... > 29 common frames omitted > {code} > Checking the code, it looks like it happens calculating the size of the > record. There is one header that is null but I don´t think I can control > those headers right? > Thanks a
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514636997 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), request.version()) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version()) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version()) + case res: DescribeUserScramCredentialsRequest =>
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514636692 ## File path: core/src/main/scala/kafka/network/RequestChannel.scala ## @@ -162,7 +161,7 @@ object RequestChannel extends Logging { } } -trace(s"Processor $processor received request: ${requestDesc(true)}") +trace(s"Processor $processor received request: ${RequestConvertToJson.requestDesc(header, loggableRequest, true).toString}") Review comment: The macro does check `isTraceEnabled`, and this check is done before the string is evaluated, so it wouldn't be unnecessarily computed. But I'm also fine with gating it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
lbradstreet commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514635784 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), request.version()) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version()) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version()) + case res: DescribeUserScramCredentialsRequest =>
[GitHub] [kafka] mjsax merged pull request #9513: MINOR: improve `null` checks for headers
mjsax merged pull request #9513: URL: https://github.com/apache/kafka/pull/9513 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9513: MINOR: improve `null` checks for headers
mjsax commented on pull request #9513: URL: https://github.com/apache/kafka/pull/9513#issuecomment-719085653 > However, it may be overkill since users don't create ConsumerRecord. Yeah, I tend to agree. Would not hurt to add it, but should not be strictly necessary. Will merge this as-is. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9530: MINOR: Fix version verification in system test
ableegoldman commented on pull request #9530: URL: https://github.com/apache/kafka/pull/9530#issuecomment-719085324 Cherrypicked to 2.6 & 2.7 cc/ @bbejeck This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9518: KAFKA-10645: Add null check to the array/Iterable values in RecordHeaders constructor
mjsax commented on pull request #9518: URL: https://github.com/apache/kafka/pull/9518#issuecomment-719084829 @showuon Seems we worked on this in parallel. Sorry for that... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514621561 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), request.version()) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version()) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version()) + case res: DescribeUserScramCredentialsRequest =>
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514619891 ## File path: core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala ## @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.util.HashMap + +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message._ +import org.junit.Test +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.record.MemoryRecords +import org.apache.kafka.common.requests._ + +import scala.collection.mutable.ArrayBuffer + +class RequestConvertToJsonTest { + + def createRequestsFromApiKey(apiKey: ApiKeys, version: Short): AbstractRequest = apiKey match { +case ApiKeys.PRODUCE => ProduceRequest.Builder.forCurrentMagic(0.toShort, 1, new HashMap[TopicPartition, MemoryRecords]()).build() +case ApiKeys.FETCH => new FetchRequest(new FetchRequestData(), version) +case ApiKeys.LIST_OFFSETS => new ListOffsetRequest(new ListOffsetRequestData().toStruct(version), version) +case ApiKeys.METADATA => new MetadataRequest(new MetadataRequestData(), version) +case ApiKeys.OFFSET_COMMIT => new OffsetCommitRequest(new OffsetCommitRequestData(), version) +case ApiKeys.OFFSET_FETCH => new OffsetFetchRequest(new OffsetFetchRequestData().toStruct(version), version) +case ApiKeys.FIND_COORDINATOR => new FindCoordinatorRequest(new FindCoordinatorRequestData().toStruct(version), version) +case ApiKeys.JOIN_GROUP => new JoinGroupRequest(new JoinGroupRequestData(), version) +case ApiKeys.HEARTBEAT => new HeartbeatRequest(new HeartbeatRequestData().toStruct(version), version) +case ApiKeys.LEAVE_GROUP => new LeaveGroupRequest(new LeaveGroupRequestData().toStruct(version), version) Review comment: I guess this is related with the consistency issue, but I tried not to make too many changes. The constructors with the `data` parameters were private in some cases, so I used the `toStruct`. But for consistency, I'll change to use one or the other rather than alternate between the two. ## File path: core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala ## @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import java.util.HashMap + +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message._ +import org.junit.Test +import org.apache.kafka.common.protocol.ApiKeys +import org.apache.kafka.common.record.MemoryRecords +import org.apache.kafka.common.requests._ + +import scala.collection.mutable.ArrayBuffer + +class RequestConvertToJsonTest { + + def createRequestsFromApiKey(apiKey: ApiKeys, version: Short): AbstractRequest = apiKey match { +case ApiKeys.PRODUCE => ProduceRequest.Builder.forCurrentMagic(0.toShort, 1, new HashMap[TopicPartition, MemoryRecords]()).build() +case ApiKeys.FETCH => new FetchRequest(new FetchRequestData(), version) +case ApiKeys.LIST_OFFSETS => new ListOffsetRequest(new ListOffsetRequestData().toStruct(version), version) +case ApiKeys.METADATA => new MetadataRequest(new MetadataRequestData(), version) +case ApiKeys.OFFSET_COMMIT => new OffsetCommitRequest(new OffsetCommitRequestData(), version) +case ApiKeys.OFFSET_FETCH => new OffsetFetchRequest(new
[GitHub] [kafka] ableegoldman merged pull request #9530: MINOR: Fix version verification in system test
ableegoldman merged pull request #9530: URL: https://github.com/apache/kafka/pull/9530 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9530: MINOR: Fix version verification in system test
ableegoldman commented on pull request #9530: URL: https://github.com/apache/kafka/pull/9530#issuecomment-719074084 One flaky test failed: `EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]` Confirmed systems test passed. Merging to trunk This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514617626 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), request.version()) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version()) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version()) + case res: DescribeUserScramCredentialsRequest =>
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514614216 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), request.version()) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version()) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version()) + case res: DescribeUserScramCredentialsRequest =>
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514610499 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), request.version()) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version()) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version()) + case res: DescribeUserScramCredentialsRequest =>
[GitHub] [kafka] vvcephei commented on pull request #9467: KAFKA-10515: Properly initialize nullable Serdes with default values
vvcephei commented on pull request #9467: URL: https://github.com/apache/kafka/pull/9467#issuecomment-719063925 Merged to 2.6 (cc @mimaison ) Thanks again for the contribution, @thake ! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10515) NPE: Foreign key join serde may not be initialized with default serde if application is distributed
[ https://issues.apache.org/jira/browse/KAFKA-10515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10515: - Fix Version/s: 2.6.1 > NPE: Foreign key join serde may not be initialized with default serde if > application is distributed > --- > > Key: KAFKA-10515 > URL: https://issues.apache.org/jira/browse/KAFKA-10515 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0, 2.5.1 >Reporter: Thorsten Hake >Priority: Critical > Fix For: 2.7.0, 2.6.1 > > > The fix of KAFKA-9517 fixed the initialization of the foreign key joins > serdes for KStream applications that do not run distributed over multiple > instances. > However, if an application runs distributed over multiple instances, the > foreign key join serdes may still not be initialized leading to the following > NPE: > {noformat} > Encountered the following error during > processing:java.lang.NullPointerException: null > at > org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:85) > at > org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:52) > at > org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:59) > at > org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:50) > at > org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:27) > at > org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487) > at > org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:102) > at > org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:55) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180) > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133) > at > org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104) > at > org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806) > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383) > at > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670){noformat} > This happens because the processors for foreign key joins will be distributed > across multiple tasks. The serde will only be initialized with the default > serde during the initialization of the task containing the sink node > ("subscription-registration-sink"). So if the task containing the > SubscriptionStoreReceiveProcessor ("subscription-receive") is not assigned to > the same instance as the task
[GitHub] [kafka] vvcephei merged pull request #9467: KAFKA-10515: Properly initialize nullable Serdes with default values
vvcephei merged pull request #9467: URL: https://github.com/apache/kafka/pull/9467 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol
hachikuji commented on a change in pull request #9401: URL: https://github.com/apache/kafka/pull/9401#discussion_r514594596 ## File path: clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java ## @@ -210,65 +142,42 @@ public String toString() { } } +/** + * We have to copy acks, timeout, transactionalId and partitionSizes from data since data maybe reset to eliminate + * the reference to ByteBuffer but those metadata are still useful. + */ private final short acks; private final int timeout; private final String transactionalId; - -private final Map partitionSizes; - +// visible for testing +final Map partitionSizes; +private boolean hasTransactionalRecords = false; +private boolean hasIdempotentRecords = false; // This is set to null by `clearPartitionRecords` to prevent unnecessary memory retention when a produce request is // put in the purgatory (due to client throttling, it can take a while before the response is sent). // Care should be taken in methods that use this field. -private volatile Map partitionRecords; -private boolean hasTransactionalRecords = false; -private boolean hasIdempotentRecords = false; - -private ProduceRequest(short version, short acks, int timeout, Map partitionRecords, String transactionalId) { -super(ApiKeys.PRODUCE, version); -this.acks = acks; -this.timeout = timeout; - -this.transactionalId = transactionalId; -this.partitionRecords = partitionRecords; -this.partitionSizes = createPartitionSizes(partitionRecords); +private volatile ProduceRequestData data; -for (MemoryRecords records : partitionRecords.values()) { -setFlags(records); -} -} - -private static Map createPartitionSizes(Map partitionRecords) { -Map result = new HashMap<>(partitionRecords.size()); -for (Map.Entry entry : partitionRecords.entrySet()) -result.put(entry.getKey(), entry.getValue().sizeInBytes()); -return result; -} - -public ProduceRequest(Struct struct, short version) { +public ProduceRequest(ProduceRequestData produceRequestData, short version) { super(ApiKeys.PRODUCE, version); -partitionRecords = new HashMap<>(); -for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) { -Struct topicData = (Struct) topicDataObj; -String topic = topicData.get(TOPIC_NAME); -for (Object partitionResponseObj : topicData.getArray(PARTITION_DATA_KEY_NAME)) { -Struct partitionResponse = (Struct) partitionResponseObj; -int partition = partitionResponse.get(PARTITION_ID); -MemoryRecords records = (MemoryRecords) partitionResponse.getRecords(RECORD_SET_KEY_NAME); -setFlags(records); -partitionRecords.put(new TopicPartition(topic, partition), records); -} -} -partitionSizes = createPartitionSizes(partitionRecords); -acks = struct.getShort(ACKS_KEY_NAME); -timeout = struct.getInt(TIMEOUT_KEY_NAME); -transactionalId = struct.getOrElse(NULLABLE_TRANSACTIONAL_ID, null); -} - -private void setFlags(MemoryRecords records) { -Iterator iterator = records.batches().iterator(); -MutableRecordBatch entry = iterator.next(); -hasIdempotentRecords = hasIdempotentRecords || entry.hasProducerId(); -hasTransactionalRecords = hasTransactionalRecords || entry.isTransactional(); +this.data = produceRequestData; +this.data.topicData().forEach(topicProduceData -> topicProduceData.partitions() +.forEach(partitionProduceData -> { +MemoryRecords records = MemoryRecords.readableRecords(partitionProduceData.records()); +Iterator iterator = records.batches().iterator(); +MutableRecordBatch entry = iterator.next(); +hasIdempotentRecords = hasIdempotentRecords || entry.hasProducerId(); Review comment: On the other hand, we might want to move this logic into a helper in `KafkaApis` so that these objects are dedicated only to serialization logic. Eventually we'll want to get rid of `ProduceRequest` and just use `ProduceRequestData`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gitlw opened a new pull request #9533: Show log end offset during truncation to help estimate data loss during ULE
gitlw opened a new pull request #9533: URL: https://github.com/apache/kafka/pull/9533 During Unclean Leader Election, there could be data loss due to truncation at the resigned leader. This PR tries to add more logs to understand the scale of message loss during an unclean leader election. Suppose there are 3 brokers that has replicas for a given partition: Broker A (leader) with largest offset 9 (log end offset 10) Broker B (follower) with largest offset 4 (log end offset 5) Broker C (follower) with largest offset 1 (log end offset 2) Only the leader A is in the ISR with B and C lagging behind. Now an unclean leader election causes the leadership to be transferred to C. Broker A would need to truncate 8 messages, and Broker B 3 messages. Case 1: if these messages have been produced with acks=0 or 1, then clients would experience 8 lost messages. Case 2: if the client is using acks=all and the partition's minISR setting is 2, and further let's assume broker B dropped out of the ISR after receiving the message with offset 4, then only the messages with offset<=4 have been acked to the client. The truncation effectively causes the client to lose 3 messages. Knowing the exact amount of data loss involves knowing the client's acks setting when the messages are produced, and also whether the messages have been sufficiently replicated according to the MinISR setting. Without getting too involved, this PR reduces the requirement from getting the exact data loss numbers to getting an ESTIMATE of the data loss. Specifically this PR adds logs during truncation to show the log end offset, number of messages truncated, and number of bytes truncated. *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines
hachikuji commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r514585353 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java ## @@ -26,30 +24,53 @@ interface Listener { /** - * Callback which is invoked when records written through {@link #scheduleAppend(int, List)} - * become committed. + * Callback which is invoked for all records committed to the log. + * It is the responsibility of the caller to invoke {@link BatchReader#close()} + * after consuming the reader. * * Note that there is not a one-to-one correspondence between writes through * {@link #scheduleAppend(int, List)} and this callback. The Raft implementation * is free to batch together the records from multiple append calls provided * that batch boundaries are respected. This means that each batch specified * through {@link #scheduleAppend(int, List)} is guaranteed to be a subset of - * a batch passed to {@link #handleCommit(int, long, List)}. + * a batch provided by the {@link BatchReader}. + * + * @param reader reader instance which must be iterated and closed + */ +void handleCommit(BatchReader reader); + +/** + * Invoked after this node has become a leader. This is only called after + * all commits up to the start of the leader's epoch have been sent to + * {@link #handleCommit(BatchReader)}. + * + * After becoming a leader, the client is eligible to write to the log + * using {@link #scheduleAppend(int, List)}. * - * @param epoch the epoch in which the write was accepted - * @param lastOffset the offset of the last record in the record list - * @param records the set of records that were committed + * @param epoch the claimed leader epoch */ -void handleCommit(int epoch, long lastOffset, List records); +default void handleClaim(int epoch) {} Review comment: Yeah, I considered using `handleBecomeLeader` and `handleResignLeader`. In the end, I decided to use the more concise `handleClaim` and `handleResign` names which are used in the kip-500 branch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines
hachikuji commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r514585353 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java ## @@ -26,30 +24,53 @@ interface Listener { /** - * Callback which is invoked when records written through {@link #scheduleAppend(int, List)} - * become committed. + * Callback which is invoked for all records committed to the log. + * It is the responsibility of the caller to invoke {@link BatchReader#close()} + * after consuming the reader. * * Note that there is not a one-to-one correspondence between writes through * {@link #scheduleAppend(int, List)} and this callback. The Raft implementation * is free to batch together the records from multiple append calls provided * that batch boundaries are respected. This means that each batch specified * through {@link #scheduleAppend(int, List)} is guaranteed to be a subset of - * a batch passed to {@link #handleCommit(int, long, List)}. + * a batch provided by the {@link BatchReader}. + * + * @param reader reader instance which must be iterated and closed + */ +void handleCommit(BatchReader reader); + +/** + * Invoked after this node has become a leader. This is only called after + * all commits up to the start of the leader's epoch have been sent to + * {@link #handleCommit(BatchReader)}. + * + * After becoming a leader, the client is eligible to write to the log + * using {@link #scheduleAppend(int, List)}. * - * @param epoch the epoch in which the write was accepted - * @param lastOffset the offset of the last record in the record list - * @param records the set of records that were committed + * @param epoch the claimed leader epoch */ -void handleCommit(int epoch, long lastOffset, List records); +default void handleClaim(int epoch) {} Review comment: Yeah, I considered using `handleBecomeLeader` and `handleResignLeader`. In the end, I decided to use the more concise `handleClaim` and `handleResign` which are used in the kip-500 branch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines
hachikuji commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r514584469 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -1688,14 +1694,39 @@ private long pollCurrentState(long currentTimeMs) throws IOException { } } +private void pollListeners() { +// Register any listeners added since the last poll +while (!pendingListeners.isEmpty()) { +Listener listener = pendingListeners.poll(); +listenerContexts.add(new ListenerContext(listener)); +} + +// Check listener progress to see if reads are expected +quorum.highWatermark().ifPresent(highWatermarkMetadata -> { +long highWatermark = highWatermarkMetadata.offset; + +List listenersToUpdate = listenerContexts.stream() +.filter(listenerContext -> { +OptionalLong nextExpectedOffset = listenerContext.nextExpectedOffset(); +return nextExpectedOffset.isPresent() && nextExpectedOffset.getAsLong() < highWatermark; +}) +.collect(Collectors.toList()); + +maybeFireHandleCommit(listenersToUpdate, highWatermarkMetadata.offset); +}); +} + public void poll() throws IOException { GracefulShutdown gracefulShutdown = shutdown.get(); if (gracefulShutdown != null) { pollShutdown(gracefulShutdown); } else { +pollListeners(); Review comment: Hmm, that's a fair question. I think the listeners will tend to get new data in two cases: 1) high watermark advanced, or 2) a previous read completes. In the first case, the high watermark only advances in response to a request, so there should be no delay. In the second case, we call `wakeup()` to take us out of the network poll, so I think there also should be no delay. Can you think of a case where there would be a delay? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on a change in pull request #9532: MINOR: Move upgraded docs from site to kafak docs
bbejeck commented on a change in pull request #9532: URL: https://github.com/apache/kafka/pull/9532#discussion_r514580970 ## File path: docs/upgrade.html ## @@ -21,6 +21,48 @@ Notable changes in 2.8.0 +Upgrading to 2.7.0 from any version 0.8.x through 2.6.x Review comment: This whole section is new, but should be standard instructions ## File path: docs/documentation.html ## @@ -22,51 +22,56 @@ - - - + + + + + + + + + + Documentation Kafka 2.8 Documentation Prior releases: 0.7.x, 0.8.0, 0.8.1.X, 0.8.2.X, 0.9.0.X, 0.10.0.X, 0.10.1.X, 0.10.2.X, 0.11.0.X, 1.0.X, 1.1.X, 2.0.X, 2.1.X, 2.2.X, 2.3.X, 2.4.X, 2.5.X, 2.6.X, 2.7.X. - - -1. Getting Started - 1.1 Introduction + 1. Getting Started + 1.1 Introduction - 1.2 Use Cases + 1.2 Use Cases - 1.3 Quick Start - - 1.4 Ecosystem + 1.3 Quick Start + + 1.4 Ecosystem - 1.5 Upgrading From Previous Versions + 1.5 Upgrading From Previous Versions -2. APIs +2. APIs -3. Configuration +3. Configuration -4. Design +4. Design -5. Implementation +5. Implementation -6. Operations +6. Operations -7. Security +7. Security -8. Kafka Connect +8. Kafka Connect Review comment: Current 2.6 docs part of AK site update ## File path: docs/documentation.html ## @@ -22,51 +22,56 @@ - - - + + + + + + + + + + Review comment: Current docs, part of AK site upgrade ## File path: docs/upgrade.html ## @@ -93,8 +135,49 @@ Notable changes in 2 https://cwiki.apache.org/confluence/display/KAFKA/KIP-616%3A+Rename+implicit+Serdes+instances+in+kafka-streams-scala;>KIP-616 +Upgrading to 2.6.0 from any version 0.8.x through 2.5.x + +If you are upgrading from a version prior to 2.1.x, please see the note below about the change to the schema used to store consumer offsets. +Once you have changed the inter.broker.protocol.version to the latest version, it will not be possible to downgrade to a version prior to 2.1. + +For a rolling upgrade: -Notable changes in 2.6.0 + + Update server.properties on all brokers and add the following properties. CURRENT_KAFKA_VERSION refers to the version you +are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the message format version currently in use. If you have previously +overridden the message format version, you should keep its current value. Alternatively, if you are upgrading from a version prior +to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to match CURRENT_KAFKA_VERSION. + +inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g., 2.5, 2.4, etc.) +log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION (See potential performance impact +following the upgrade for the details on what this configuration does.) + +If you are upgrading from version 0.11.0.x or above, and you have not overridden the message format, then you only need to override +the inter-broker protocol version. + +inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g., 2.5, 2.4, etc.) + + + Upgrade the brokers one at a time: shut down the broker, update the code, and restart it. Once you have done so, the +brokers will be running the latest version and you can verify that the cluster's behavior and performance meets expectations. +It is still possible to downgrade at this point if there are any problems. + + Once the cluster's behavior and performance has been verified, bump the protocol version by editing +inter.broker.protocol.version and setting it to 2.6. + + Restart the brokers one by one for the new protocol version to take effect. Once the brokers begin using the latest +protocol version, it will no longer be possible to downgrade the cluster to an older version. + + If you have overridden the message format version as instructed above, then you need to do one more rolling restart to +upgrade it to its latest version. Once all (or most) consumers have been upgraded to 0.11.0 or later, +change log.message.format.version to 2.6 on each broker and restart them one by one. Note that the older Scala clients, +which are no longer maintained, do not support the message format introduced in 0.11, so to avoid conversion costs +(or to take advantage of exactly once semantics), +the newer Java clients must be used. + + + +Notable changes in 2.6.0 Review comment: Current 2.6 docs ## File path: docs/upgrade.html
[GitHub] [kafka] bbejeck commented on pull request #9532: MINOR: Move upgraded docs from site to kafak docs
bbejeck commented on pull request #9532: URL: https://github.com/apache/kafka/pull/9532#issuecomment-719035513 Note that most of this PR already exists in 2.6 and doesn't need a close look This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lbradstreet commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
lbradstreet commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514577992 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), request.version()) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version()) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version()) + case res: DescribeUserScramCredentialsRequest =>
[GitHub] [kafka] lbradstreet commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
lbradstreet commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514577021 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), request.version()) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version()) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version()) + case res: DescribeUserScramCredentialsRequest =>
[GitHub] [kafka] lbradstreet commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
lbradstreet commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514576105 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), request.version()) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version()) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version()) + case res: DescribeUserScramCredentialsRequest =>
[GitHub] [kafka] lbradstreet commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
lbradstreet commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514575372 ## File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala ## @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.network + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode} +import kafka.network.RequestChannel.{Response, Session} +import org.apache.kafka.common.message._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch +import org.apache.kafka.common.requests._ +import org.apache.kafka.common.utils.CollectionUtils + +import scala.jdk.CollectionConverters._ + +object RequestConvertToJson { + def request(request: AbstractRequest, verbose: Boolean): JsonNode = { +request match { + case req: AddOffsetsToTxnRequest => AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AddPartitionsToTxnRequest => AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterClientQuotasRequest => AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterConfigsRequest => AlterConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterIsrRequest => AlterIsrRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterPartitionReassignmentsRequest => AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), request.version()) + case req: AlterReplicaLogDirsRequest => AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case res: AlterUserScramCredentialsRequest => AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), request.version()) + case req: ApiVersionsRequest => ApiVersionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: BeginQuorumEpochRequest => BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version()) + case req: ControlledShutdownRequest => ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateAclsRequest => CreateAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version()) + case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DeleteTopicsRequest => DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeAclsRequest => DescribeAclsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeClientQuotasRequest => DescribeClientQuotasRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeConfigsRequest => DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeDelegationTokenRequest => DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeGroupsRequest => DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeLogDirsRequest => DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version()) + case req: DescribeQuorumRequest => DescribeQuorumRequestDataJsonConverter.write(req.data, request.version()) + case res: DescribeUserScramCredentialsRequest =>
[GitHub] [kafka] lbradstreet commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
lbradstreet commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514574814 ## File path: core/src/main/scala/kafka/network/RequestChannel.scala ## @@ -162,7 +161,7 @@ object RequestChannel extends Logging { } } -trace(s"Processor $processor received request: ${requestDesc(true)}") +trace(s"Processor $processor received request: ${RequestConvertToJson.requestDesc(header, loggableRequest, true).toString}") Review comment: Would we unnecessarily calculate it here? I think the macro will avoid it in this case, but I'm fine with gating it anyway. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331 add a streams handler
wcarlson5 commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r514566700 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -346,26 +351,89 @@ public void setStateListener(final KafkaStreams.StateListener listener) { * Set the handler invoked when a {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly * terminates due to an uncaught exception. * - * @param eh the uncaught exception handler for all internal threads; {@code null} deletes the current handler + * @param uncaughtExceptionHandler the uncaught exception handler for all internal threads; {@code null} deletes the current handler * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + * + * @Deprecated Since 2.7.0. Use {@link KafkaStreams#setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler)} instead. + * */ -public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh) { +public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { synchronized (stateLock) { if (state == State.CREATED) { for (final StreamThread thread : threads) { -thread.setUncaughtExceptionHandler(eh); + thread.setUncaughtExceptionHandler(uncaughtExceptionHandler); } if (globalStreamThread != null) { -globalStreamThread.setUncaughtExceptionHandler(eh); + globalStreamThread.setUncaughtExceptionHandler(uncaughtExceptionHandler); } } else { throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " + -"Current state is: " + state); +"Current state is: " + state); } } } +/** + * Set the handler invoked when a {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} + * throws an unexpected exception. + * These might be exceptions indicating rare bugs in Kafka Streams, or they + * might be exceptions thrown by your code, for example a NullPointerException thrown from your processor + * logic. + * + * Note, this handler must be threadsafe, since it will be shared among all threads, and invoked from any + * thread that encounters such an exception. + * + * @param streamsUncaughtExceptionHandler the uncaught exception handler of type {@link StreamsUncaughtExceptionHandler} for all internal threads; {@code null} deletes the current handler + * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + * @throws NullPointerException @NotNull if streamsUncaughtExceptionHandler is null. + */ +public void setUncaughtExceptionHandler(final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final StreamsUncaughtExceptionHandler handler = exception -> handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler); +synchronized (stateLock) { +if (state == State.CREATED) { +Objects.requireNonNull(streamsUncaughtExceptionHandler); +for (final StreamThread thread : threads) { +thread.setStreamsUncaughtExceptionHandler(handler); +} +if (globalStreamThread != null) { +globalStreamThread.setUncaughtExceptionHandler(handler); +} +} else { +throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " + +"Current state is: " + state); +} +} +} + +private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse handleStreamsUncaughtException(final Throwable e, + final StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) { +final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse action = streamsUncaughtExceptionHandler.handle(e); +switch (action) { +//case REPLACE_STREAM_THREAD: Review comment: It will. I don't know if we should merge as comment or just add it later This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331 add a streams handler
wcarlson5 commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r514567028 ## File path: streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +public interface StreamsUncaughtExceptionHandler { +/** + * Inspect the exception received in a stream thread and respond with an action. + * @param exception the actual exception + */ +StreamThreadExceptionResponse handle(final Throwable exception); + +/** + * Enumeration that describes the response from the exception handler. + */ +enum StreamThreadExceptionResponse { +//REPLACE_THREAD(0, "REPLACE_THREAD"), Review comment: Same as the other use in KS This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9487: KAFKA-9331 add a streams handler
lct45 commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r514535737 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ## @@ -282,6 +284,15 @@ public boolean isRunning() { private final Admin adminClient; private final InternalTopologyBuilder builder; + Review comment: two new lines in a row ## File path: streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java ## @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +public interface StreamsUncaughtExceptionHandler { +/** + * Inspect the exception received in a stream thread and respond with an action. + * @param exception the actual exception + */ +StreamThreadExceptionResponse handle(final Throwable exception); + +/** + * Enumeration that describes the response from the exception handler. + */ +enum StreamThreadExceptionResponse { +//REPLACE_THREAD(0, "REPLACE_THREAD"), Review comment: Supposed to be here? ## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ## @@ -616,13 +623,22 @@ public void shouldNotSetGlobalRestoreListenerAfterStarting() { public void shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState() { final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); streams.start(); -try { -streams.setUncaughtExceptionHandler(null); -fail("Should throw IllegalStateException"); -} catch (final IllegalStateException e) { -// expected -} +assertThrows(IllegalStateException.class, () -> streams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler) null)); +} + +@Test +public void shouldThrowExceptionSettingStreamsUncaughtExceptionHandlerNotInCreateState() { +final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); +streams.start(); +assertThrows(IllegalStateException.class, () -> streams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler) null)); + } +@Test +public void shouldThrowNullPointerExceptionSettingStreamsUncaughtExceptionHandlerIfNull() { +final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); +assertThrows(NullPointerException.class, () -> streams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler) null)); +} + Review comment: extra line ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -346,26 +351,89 @@ public void setStateListener(final KafkaStreams.StateListener listener) { * Set the handler invoked when a {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly * terminates due to an uncaught exception. * - * @param eh the uncaught exception handler for all internal threads; {@code null} deletes the current handler + * @param uncaughtExceptionHandler the uncaught exception handler for all internal threads; {@code null} deletes the current handler * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + * + * @Deprecated Since 2.7.0. Use {@link KafkaStreams#setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler)} instead. + * */ -public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh) { +public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { synchronized (stateLock) { if (state == State.CREATED) { for (final StreamThread thread : threads) { -thread.setUncaughtExceptionHandler(eh); + thread.setUncaughtExceptionHandler(uncaughtExceptionHandler); } if (globalStreamThread != null) { -
[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines
hachikuji commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r514539737 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -329,8 +387,9 @@ private void appendLeaderChangeMessage(LeaderState state, long currentTimeMs) { } private void flushLeaderLog(LeaderState state, long currentTimeMs) { -log.flush(); +// We update the end offset before flushing so that parked fetches can return sooner Review comment: Yeah, it's ok for followers to see uncommitted or even unflushed data. The main thing is that we avoid advancing the high watermark until the fsync completes. Note that this is the main reason that we had to do KAFKA-10527. Without this fix, it was possible for the leader to continue in the same epoch after a start, which means that it could lose and overwrite unflushed data. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines
hachikuji commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r514537694 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -228,35 +234,81 @@ private void updateLeaderEndOffsetAndTimestamp( final LogOffsetMetadata endOffsetMetadata = log.endOffset(); if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) { -updateHighWatermark(state, currentTimeMs); +onUpdateLeaderHighWatermark(state, currentTimeMs); } -LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, Isolation.UNCOMMITTED); -fetchPurgatory.maybeComplete(endOffset, currentTimeMs); +fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs); } -private void updateHighWatermark( -EpochState state, +private void onUpdateLeaderHighWatermark( +LeaderState state, long currentTimeMs ) { state.highWatermark().ifPresent(highWatermark -> { -logger.debug("High watermark updated to {}", highWatermark); +logger.debug("Leader high watermark updated to {}", highWatermark); log.updateHighWatermark(highWatermark); - -LogOffset offset = new LogOffset(highWatermark.offset, Isolation.COMMITTED); -appendPurgatory.maybeComplete(offset, currentTimeMs); -fetchPurgatory.maybeComplete(offset, currentTimeMs); +appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs); +maybeFireHandleCommit(highWatermark.offset); }); } -@Override -public LeaderAndEpoch currentLeaderAndEpoch() { -return quorum.leaderAndEpoch(); +private void maybeFireHandleCommit(long highWatermark) { +maybeFireHandleCommit(listenerContexts, highWatermark); +} + +private void maybeFireHandleCommit(List listenerContexts, long highWatermark) { +// TODO: When there are multiple listeners, we can cache reads to save some work +for (ListenerContext listenerContext : listenerContexts) { +OptionalLong nextExpectedOffsetOpt = listenerContext.nextExpectedOffset(); +if (!nextExpectedOffsetOpt.isPresent()) { +continue; +} + +long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong(); +if (nextExpectedOffset < highWatermark) { +LogFetchInfo readInfo = log.read(nextExpectedOffset, Isolation.COMMITTED); +listenerContext.fireHandleCommit(nextExpectedOffset, readInfo.records); +} +} +} + +private void maybeFireHandleCommit(long baseOffset, int epoch, List records) { Review comment: The only difference is the input. I will add some comments to try and clarify the usage. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines
hachikuji commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r514536828 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -228,35 +234,81 @@ private void updateLeaderEndOffsetAndTimestamp( final LogOffsetMetadata endOffsetMetadata = log.endOffset(); if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) { -updateHighWatermark(state, currentTimeMs); +onUpdateLeaderHighWatermark(state, currentTimeMs); } -LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, Isolation.UNCOMMITTED); -fetchPurgatory.maybeComplete(endOffset, currentTimeMs); +fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs); } -private void updateHighWatermark( -EpochState state, +private void onUpdateLeaderHighWatermark( +LeaderState state, long currentTimeMs ) { state.highWatermark().ifPresent(highWatermark -> { -logger.debug("High watermark updated to {}", highWatermark); +logger.debug("Leader high watermark updated to {}", highWatermark); log.updateHighWatermark(highWatermark); - -LogOffset offset = new LogOffset(highWatermark.offset, Isolation.COMMITTED); -appendPurgatory.maybeComplete(offset, currentTimeMs); -fetchPurgatory.maybeComplete(offset, currentTimeMs); +appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs); +maybeFireHandleCommit(highWatermark.offset); }); } -@Override -public LeaderAndEpoch currentLeaderAndEpoch() { -return quorum.leaderAndEpoch(); +private void maybeFireHandleCommit(long highWatermark) { +maybeFireHandleCommit(listenerContexts, highWatermark); +} + +private void maybeFireHandleCommit(List listenerContexts, long highWatermark) { +// TODO: When there are multiple listeners, we can cache reads to save some work +for (ListenerContext listenerContext : listenerContexts) { +OptionalLong nextExpectedOffsetOpt = listenerContext.nextExpectedOffset(); +if (!nextExpectedOffsetOpt.isPresent()) { +continue; +} + +long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong(); +if (nextExpectedOffset < highWatermark) { +LogFetchInfo readInfo = log.read(nextExpectedOffset, Isolation.COMMITTED); +listenerContext.fireHandleCommit(nextExpectedOffset, readInfo.records); +} +} +} + +private void maybeFireHandleCommit(long baseOffset, int epoch, List records) { +for (ListenerContext listenerContext : listenerContexts) { +OptionalLong nextExpectedOffsetOpt = listenerContext.nextExpectedOffset(); +if (!nextExpectedOffsetOpt.isPresent()) { +continue; +} + +long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong(); +if (nextExpectedOffset == baseOffset) { +listenerContext.fireHandleCommit(baseOffset, epoch, records); +} +} +} + +private void maybeFireHandleClaim(LeaderState state) { +for (ListenerContext listenerContext : listenerContexts) { +int leaderEpoch = state.epoch(); Review comment: I will move this outside the loop. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines
hachikuji commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r514535761 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/ThresholdPurgatory.java ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import org.apache.kafka.raft.ExpirationService; + +import java.util.NavigableMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicLong; + +public class ThresholdPurgatory> implements FuturePurgatory { +private final AtomicLong idGenerator = new AtomicLong(0); +private final ExpirationService expirationService; +private final ConcurrentNavigableMap, CompletableFuture> thresholdMap = +new ConcurrentSkipListMap<>(); + +public ThresholdPurgatory(ExpirationService expirationService) { +this.expirationService = expirationService; +} + +@Override +public CompletableFuture await(T threshold, long maxWaitTimeMs) { +ThresholdKey key = new ThresholdKey<>(idGenerator.incrementAndGet(), threshold); +CompletableFuture future = expirationService.await(maxWaitTimeMs); +thresholdMap.put(key, future); +future.whenComplete((timeMs, exception) -> thresholdMap.remove(key)); +return future; +} + +@Override +public void maybeComplete(T value, long currentTimeMs) { +ThresholdKey maxKey = new ThresholdKey<>(Long.MAX_VALUE, value); +NavigableMap, CompletableFuture> submap = thresholdMap.headMap(maxKey); +for (CompletableFuture completion : submap.values()) { +completion.complete(currentTimeMs); +} +} + +@Override +public void completeAll(long currentTimeMs) { +for (CompletableFuture completion : thresholdMap.values()) { +completion.complete(currentTimeMs); +} +} + +@Override +public void completeAllExceptionally(Throwable exception) { +for (CompletableFuture completion : thresholdMap.values()) { +completion.completeExceptionally(exception); +} +} + +@Override +public int numWaiting() { +return thresholdMap.size(); +} + +private static class ThresholdKey> implements Comparable> { Review comment: Not at the moment. I guess your point is that we might be able to drop the generics, which is fair. I think we can also drop the `FuturePurgatory` interface. Is it ok if we save this for a follow-up? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines
hachikuji commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r514533712 ## File path: raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java ## @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import org.apache.kafka.common.protocol.DataInputStreamReadable; +import org.apache.kafka.common.protocol.Readable; +import org.apache.kafka.common.record.BufferSupplier; +import org.apache.kafka.common.record.DefaultRecordBatch; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.MutableRecordBatch; +import org.apache.kafka.common.record.Records; +import org.apache.kafka.raft.BatchReader; +import org.apache.kafka.raft.RecordSerde; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.OptionalLong; + +public class RecordsBatchReader implements BatchReader { Review comment: `MemoryBatchReader` is only used for writes from the leader. We retain the original records from the call to `scheduleAppend` and send them to the listener in `handleCommit`. This is useful because it ensures the active controller will not need to read from disk. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on a change in pull request #9487: KAFKA-9331 add a streams handler
lct45 commented on a change in pull request #9487: URL: https://github.com/apache/kafka/pull/9487#discussion_r514527605 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -346,26 +351,89 @@ public void setStateListener(final KafkaStreams.StateListener listener) { * Set the handler invoked when a {@link StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly * terminates due to an uncaught exception. * - * @param eh the uncaught exception handler for all internal threads; {@code null} deletes the current handler + * @param uncaughtExceptionHandler the uncaught exception handler for all internal threads; {@code null} deletes the current handler * @throws IllegalStateException if this {@code KafkaStreams} instance is not in state {@link State#CREATED CREATED}. + * + * @Deprecated Since 2.7.0. Use {@link KafkaStreams#setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler)} instead. + * */ -public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler eh) { +public void setUncaughtExceptionHandler(final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) { synchronized (stateLock) { if (state == State.CREATED) { for (final StreamThread thread : threads) { -thread.setUncaughtExceptionHandler(eh); + thread.setUncaughtExceptionHandler(uncaughtExceptionHandler); } if (globalStreamThread != null) { -globalStreamThread.setUncaughtExceptionHandler(eh); + globalStreamThread.setUncaughtExceptionHandler(uncaughtExceptionHandler); } } else { throw new IllegalStateException("Can only set UncaughtExceptionHandler in CREATED state. " + -"Current state is: " + state); +"Current state is: " + state); Review comment: Is this spacing on purpose? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck opened a new pull request #9532: MINOR: Move upgraded docs from site to kafak docs
bbejeck opened a new pull request #9532: URL: https://github.com/apache/kafka/pull/9532 For the 2.7 release, we need to migrate some docs changes that went to `kafka-site` but didn't go into `kafka/docs` This PR covers the `documentation.html` and `uprade.html` changes. Once these are merged to trunk, I'll cherry-pick them to the 2.7 branch *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji opened a new pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization
hachikuji opened a new pull request #9531: URL: https://github.com/apache/kafka/pull/9531 When initializing the raft state machine after shutting down as a leader, we were previously entering the "unattached" state, which means we have no leader and no voted candidate. This was a bug because it allowed a reinitialized leader to cast a vote for a candidate in the same epoch that it was already the leader of. This patch fixes the problem by introducing a new "resigned" state which allows us to retain the leader state so that we cannot change our vote and we will not accept additional appends. This patch also revamps the shutdown logic to make use of the new "resigned" state. Previously we had a separate path in `KafkaRaftClient.poll` for the shutdown logic which resulted in some duplication. Instead now we incorporate shutdown behavior into each state's respective logic. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda merged pull request #9529: Revert initial principal from 2.7
abbccdda merged pull request #9529: URL: https://github.com/apache/kafka/pull/9529 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines
hachikuji commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r514518360 ## File path: raft/src/main/java/org/apache/kafka/raft/BatchReader.java ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import java.io.Closeable; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.OptionalLong; + +/** + * This interface is used to send committed data from the {@link RaftClient} + * down to registered {@link RaftClient.Listener} instances. + * + * The advantage of hiding the consumption of committed batches behind an interface + * is that it allows us to push blocking operations such as reads from disk outside + * of the Raft IO thread. This helps to ensure that a slow state machine will not + * affect replication. + * + * @param record type (see {@link org.apache.kafka.raft.RecordSerde}) + */ +public interface BatchReader extends Iterator>, Closeable { + +/** + * Get the base offset of the readable batches. Note that this value is a constant + * which is defined when the {@link BatchReader} instance is constructed. It does + * not change based on reader progress. + * + * @return the base offset + */ +long baseOffset(); + +/** + * Get the last offset of the batch if it is known. When reading from disk, we may + * not know the last offset of a set of records until it has been read from disk. + * In this case, the state machine cannot advance to the next committed data until + * all batches from the {@link BatchReader} instance have been consumed. + * + * @return optional last offset + */ +OptionalLong lastOffset(); + +/** + * Close this reader. It is the responsibility of the {@link RaftClient.Listener} + * to close each reader passed to {@link RaftClient.Listener#handleCommit(BatchReader)}. + */ +@Override +void close(); + +class Batch { +private final long baseOffset; +private final int epoch; +private final List records; + +public Batch(long baseOffset, int epoch, List records) { +this.baseOffset = baseOffset; +this.epoch = epoch; +this.records = records; +} + +public long lastOffset() { +return baseOffset + records.size() - 1; +} + +public long baseOffset() { +return baseOffset; +} + +public List records() { +return records; +} + +public int epoch() { +return epoch; +} + +@Override +public String toString() { +return "Batch(" + +"baseOffset=" + baseOffset + +", epoch=" + epoch + +", records=" + records + +')'; +} + +@Override +public boolean equals(Object o) { +if (this == o) return true; +if (o == null || getClass() != o.getClass()) return false; +Batch batch = (Batch) o; +return baseOffset == batch.baseOffset && +epoch == batch.epoch && +Objects.equals(records, batch.records); Review comment: We are relying on the standard `equals` method. I think it's up to the user of the api to ensure a reasonable implementation if they expect to rely on batch equality. The raft implementation does not set any expectations on record equality, but it is useful in testing where we can control the record type. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10663) Flakey test ConsumerBounceTest#testSeekAndCommitWithBrokerFailures
Boyang Chen created KAFKA-10663: --- Summary: Flakey test ConsumerBounceTest#testSeekAndCommitWithBrokerFailures Key: KAFKA-10663 URL: https://issues.apache.org/jira/browse/KAFKA-10663 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 2.7.0 Reporter: Boyang Chen org.apache.kafka.common.KafkaException: Socket server failed to bind to localhost:40823: Address already in use. at kafka.network.Acceptor.openServerSocket(SocketServer.scala:671) at kafka.network.Acceptor.(SocketServer.scala:539) at kafka.network.SocketServer.createAcceptor(SocketServer.scala:280) at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:253) at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:251) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:251) at kafka.network.SocketServer.startup(SocketServer.scala:125) at kafka.server.KafkaServer.startup(KafkaServer.scala:303) at kafka.utils.TestUtils$.createServer(TestUtils.scala:160) at kafka.utils.TestUtils$.createServer(TestUtils.scala:151) at kafka.integration.KafkaServerTestHarness.$anonfun$setUp$1(KafkaServerTestHarness.scala:102) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scal >From AK 2.7 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines
hachikuji commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r514516570 ## File path: raft/src/main/java/org/apache/kafka/raft/BatchReader.java ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft; + +import java.io.Closeable; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.OptionalLong; + +/** + * This interface is used to send committed data from the {@link RaftClient} + * down to registered {@link RaftClient.Listener} instances. + * + * The advantage of hiding the consumption of committed batches behind an interface + * is that it allows us to push blocking operations such as reads from disk outside + * of the Raft IO thread. This helps to ensure that a slow state machine will not + * affect replication. + * + * @param record type (see {@link org.apache.kafka.raft.RecordSerde}) + */ +public interface BatchReader extends Iterator>, Closeable { + +/** + * Get the base offset of the readable batches. Note that this value is a constant + * which is defined when the {@link BatchReader} instance is constructed. It does + * not change based on reader progress. + * + * @return the base offset + */ +long baseOffset(); + +/** + * Get the last offset of the batch if it is known. When reading from disk, we may + * not know the last offset of a set of records until it has been read from disk. + * In this case, the state machine cannot advance to the next committed data until + * all batches from the {@link BatchReader} instance have been consumed. + * + * @return optional last offset + */ +OptionalLong lastOffset(); + +/** + * Close this reader. It is the responsibility of the {@link RaftClient.Listener} + * to close each reader passed to {@link RaftClient.Listener#handleCommit(BatchReader)}. + */ +@Override +void close(); + +class Batch { +private final long baseOffset; +private final int epoch; +private final List records; + +public Batch(long baseOffset, int epoch, List records) { +this.baseOffset = baseOffset; +this.epoch = epoch; +this.records = records; +} + +public long lastOffset() { +return baseOffset + records.size() - 1; +} + +public long baseOffset() { +return baseOffset; +} + +public List records() { +return records; +} + +public int epoch() { +return epoch; +} + +@Override +public String toString() { +return "Batch(" + +"baseOffset=" + baseOffset + +", epoch=" + epoch + +", records=" + records + Review comment: Yeah, I'm relying on the `toString`. I think this is only useful for debugging. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines
hachikuji commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r514515763 ## File path: clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.protocol; + +import org.apache.kafka.common.utils.ByteUtils; + +import java.io.Closeable; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +public class DataInputStreamReadable implements Readable, Closeable { +protected final DataInputStream input; + +public DataInputStreamReadable(DataInputStream input) { +this.input = input; +} + +@Override +public byte readByte() { +try { +return input.readByte(); +} catch (IOException e) { Review comment: `IOException` is checked, so we cannot raise it from the current `Readable` interface, so the options are to either add the exception to the `Readable` interface or to rethrow it as an unchecked exception. I went with the latter to reduce the impact and because I think we tend to prefer unchecked exceptions in general since checked exceptions sort of end up leaking their way through a bunch of call stacks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines
hachikuji commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r514514051 ## File path: clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.protocol; + +import org.apache.kafka.common.utils.ByteUtils; + +import java.io.Closeable; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +public class DataInputStreamReadable implements Readable, Closeable { +protected final DataInputStream input; + +public DataInputStreamReadable(DataInputStream input) { +this.input = input; +} + +@Override +public byte readByte() { +try { +return input.readByte(); +} catch (IOException e) { +throw new RuntimeException(e); +} +} + +@Override +public short readShort() { +try { Review comment: Yeah, I could introduce a helper with a lambda, but that would add some unnecessary garbage to the deserialization path. Although it is ugly, I think the duplication is not a big deal. We probably won't touch this class after it is created. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines
hachikuji commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r514512213 ## File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java ## @@ -113,11 +113,11 @@ protected HistogramSample newSample(long timeMs) { protected void update(Sample sample, MetricConfig config, double value, long timeMs) { final double boundedValue; if (value > max) { -log.warn("Received value {} which is greater than max recordable value {}, will be pinned to the max value", +log.debug("Received value {} which is greater than max recordable value {}, will be pinned to the max value", Review comment: The `warn` seemed excessive for a metric update, which could be done very frequently. I looked over the code and it looks like we don't really use this outside of tests (and now the code added in this patch). I think the user should just understand the contract, which is that anything outside of the specified range gets rounded down. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema
anatasiavela commented on a change in pull request #9526: URL: https://github.com/apache/kafka/pull/9526#discussion_r514508755 ## File path: clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java ## @@ -57,6 +57,10 @@ public AddOffsetsToTxnRequest(Struct struct, short version) { this.data = new AddOffsetsToTxnRequestData(struct, version); } +public AddOffsetsToTxnRequestData data() { +return data; +} Review comment: Yea I definitely agree with wanting consistency. I'll stick to 1) in the PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #9530: MINOR: Fix version verification in system test
cadonna commented on pull request #9530: URL: https://github.com/apache/kafka/pull/9530#issuecomment-718965450 System tests run: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4276/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines
hachikuji commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r514504593 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -1688,14 +1694,39 @@ private long pollCurrentState(long currentTimeMs) throws IOException { } } +private void pollListeners() { +// Register any listeners added since the last poll +while (!pendingListeners.isEmpty()) { Review comment: I doubt we would use it in practice, though I guess it would open the door to changing roles dynamically, which might be interesting in the future. That said, it was simple to add and useful in testing since it gave me an easy way to initialize a state where a listener had not caught up. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #9530: MINOR: Fix version verification in system test
cadonna commented on pull request #9530: URL: https://github.com/apache/kafka/pull/9530#issuecomment-718961222 This fix needs to be cherry-picked to 2.7 and 2.6 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #9530: MINOR: Fix version verification in system test
cadonna commented on pull request #9530: URL: https://github.com/apache/kafka/pull/9530#issuecomment-718960604 Call for review: @lct45 @ableegoldman @vvcephei This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna opened a new pull request #9530: MINOR: Fix version verification in system test
cadonna opened a new pull request #9530: URL: https://github.com/apache/kafka/pull/9530 The system test StreamsUpgradeTest.test_version_probing_upgrade tries to verify the wrong version for version probing. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10662) Possible hanging test in 2.6 on JDK 11
John Roesler created KAFKA-10662: Summary: Possible hanging test in 2.6 on JDK 11 Key: KAFKA-10662 URL: https://issues.apache.org/jira/browse/KAFKA-10662 Project: Kafka Issue Type: Bug Affects Versions: 2.6.1 Reporter: John Roesler Attachments: timeout-1.txt, timeout-2.txt, timeout-4.txt While adding a Jenkinsfile to the 2.6 branch ([https://github.com/apache/kafka/pull/9471),] I observed the JDK 11 build specifically to hang, 3/5 times (and pass within a normal timeframe of 2.5 hours the other two times). I haven't seen similar behavior on any other branch, so there may be something about the 2.6 codebase or the 2.6 tests themselves that interact poorly with Java 11. I did some analysis on the failing results, and found that in all three hanging cases, all the tests that "STARTED" either "PASSED" or were "SKIPPED". So, I was not able to identify a specific culprit. I've attached the logs for these runs, in case they aid any investigation. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] senthilm-ms commented on pull request #9495: KAFKA-10642: Expose the real stack trace if any exception occurred during SSL Client Trust Verification in extension
senthilm-ms commented on pull request #9495: URL: https://github.com/apache/kafka/pull/9495#issuecomment-718944686 > @rajinisivaram : What do you think? Thanks. @junrao @rajinisivaram - take a look at the stack trace mentioned in the JIRA bug where you don't see the extension/custom implementation stack trace This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines
abbccdda commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r514482551 ## File path: core/src/main/scala/kafka/raft/TimingWheelExpirationService.scala ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.raft + +import java.util.concurrent.CompletableFuture + +import kafka.raft.TimingWheelExpirationService.TimerTaskCompletableFuture +import kafka.utils.ShutdownableThread +import kafka.utils.timer.{Timer, TimerTask} +import org.apache.kafka.common.errors.TimeoutException +import org.apache.kafka.raft.ExpirationService + +object TimingWheelExpirationService { + class TimerTaskCompletableFuture[T](override val delayMs: Long) extends CompletableFuture[T] with TimerTask { +override def run(): Unit = { + completeExceptionally(new TimeoutException( +s"Future failed to be completed before timeout of $delayMs ms was reached")) +} + } +} + +class TimingWheelExpirationService(timer: Timer) extends ExpirationService { + private val expirationReaper = new ExpiredOperationReaper() + + expirationReaper.start() + + override def await[T](timeoutMs: Long): CompletableFuture[T] = { +val future = new TimerTaskCompletableFuture[T](timeoutMs) +future.whenComplete { (_, _) => + future.cancel() +} +timer.add(future) +future + } + + private class ExpiredOperationReaper extends ShutdownableThread( +name = "raft-expiration-reaper", isInterruptible = false) { + +override def doWork(): Unit = { + timer.advanceClock(200L) Review comment: Could we make 200L a constant? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #9495: KAFKA-10642: Expose the real stack trace if any exception occurred during SSL Client Trust Verification in extension
junrao commented on pull request #9495: URL: https://github.com/apache/kafka/pull/9495#issuecomment-718942968 @rajinisivaram : What do you think? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines
abbccdda commented on a change in pull request #9482: URL: https://github.com/apache/kafka/pull/9482#discussion_r513200010 ## File path: clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java ## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.protocol; + +import org.apache.kafka.common.utils.ByteUtils; + +import java.io.Closeable; +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + +public class DataInputStreamReadable implements Readable, Closeable { +protected final DataInputStream input; + +public DataInputStreamReadable(DataInputStream input) { +this.input = input; +} + +@Override +public byte readByte() { +try { +return input.readByte(); +} catch (IOException e) { +throw new RuntimeException(e); +} +} + +@Override +public short readShort() { +try { Review comment: nit: we could refactor out the try-catch logic. ## File path: clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java ## @@ -113,11 +113,11 @@ protected HistogramSample newSample(long timeMs) { protected void update(Sample sample, MetricConfig config, double value, long timeMs) { final double boundedValue; if (value > max) { -log.warn("Received value {} which is greater than max recordable value {}, will be pinned to the max value", +log.debug("Received value {} which is greater than max recordable value {}, will be pinned to the max value", Review comment: Why do we change these values to debug? ## File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java ## @@ -369,6 +379,17 @@ public void transitionToLeader(long epochStartOffset) throws IOException { if (!candidateState.isVoteGranted()) throw new IllegalStateException("Cannot become leader without majority votes granted"); +// Note that the leader does not retain the high watermark that was known +// in the previous state. The reason it does not is to protect the monotonicity Review comment: `it does not` could be removed. ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -228,35 +234,81 @@ private void updateLeaderEndOffsetAndTimestamp( final LogOffsetMetadata endOffsetMetadata = log.endOffset(); if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) { -updateHighWatermark(state, currentTimeMs); +onUpdateLeaderHighWatermark(state, currentTimeMs); } -LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, Isolation.UNCOMMITTED); -fetchPurgatory.maybeComplete(endOffset, currentTimeMs); +fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs); } -private void updateHighWatermark( -EpochState state, +private void onUpdateLeaderHighWatermark( +LeaderState state, long currentTimeMs ) { state.highWatermark().ifPresent(highWatermark -> { -logger.debug("High watermark updated to {}", highWatermark); +logger.debug("Leader high watermark updated to {}", highWatermark); log.updateHighWatermark(highWatermark); - -LogOffset offset = new LogOffset(highWatermark.offset, Isolation.COMMITTED); -appendPurgatory.maybeComplete(offset, currentTimeMs); -fetchPurgatory.maybeComplete(offset, currentTimeMs); +appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs); +maybeFireHandleCommit(highWatermark.offset); }); } -@Override -public LeaderAndEpoch currentLeaderAndEpoch() { -return quorum.leaderAndEpoch(); +private void maybeFireHandleCommit(long highWatermark) { +maybeFireHandleCommit(listenerContexts, highWatermark); +} + +private void maybeFireHandleCommit(List listenerContexts, long highWatermark) { +// TODO: When there are multiple listeners, we can cache reads to
[GitHub] [kafka] junrao commented on pull request #6915: KAFKA-8334 Executor to retry delayed operations failed to obtain lock
junrao commented on pull request #6915: URL: https://github.com/apache/kafka/pull/6915#issuecomment-718933493 @yinchuanwang : The short answer is that the retry logic could cause a watcher event not to be triggered. If there is no future event triggering the same watcher, the corresponding delayed request may not be completed in time, leading to request timing out in the client. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol
hachikuji commented on a change in pull request #9401: URL: https://github.com/apache/kafka/pull/9401#discussion_r514467652 ## File path: clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java ## @@ -210,65 +142,42 @@ public String toString() { } } +/** + * We have to copy acks, timeout, transactionalId and partitionSizes from data since data maybe reset to eliminate + * the reference to ByteBuffer but those metadata are still useful. + */ private final short acks; private final int timeout; private final String transactionalId; - -private final Map partitionSizes; - +// visible for testing +final Map partitionSizes; +private boolean hasTransactionalRecords = false; +private boolean hasIdempotentRecords = false; // This is set to null by `clearPartitionRecords` to prevent unnecessary memory retention when a produce request is // put in the purgatory (due to client throttling, it can take a while before the response is sent). // Care should be taken in methods that use this field. -private volatile Map partitionRecords; -private boolean hasTransactionalRecords = false; -private boolean hasIdempotentRecords = false; - -private ProduceRequest(short version, short acks, int timeout, Map partitionRecords, String transactionalId) { -super(ApiKeys.PRODUCE, version); -this.acks = acks; -this.timeout = timeout; - -this.transactionalId = transactionalId; -this.partitionRecords = partitionRecords; -this.partitionSizes = createPartitionSizes(partitionRecords); +private volatile ProduceRequestData data; -for (MemoryRecords records : partitionRecords.values()) { -setFlags(records); -} -} - -private static Map createPartitionSizes(Map partitionRecords) { -Map result = new HashMap<>(partitionRecords.size()); -for (Map.Entry entry : partitionRecords.entrySet()) -result.put(entry.getKey(), entry.getValue().sizeInBytes()); -return result; -} - -public ProduceRequest(Struct struct, short version) { +public ProduceRequest(ProduceRequestData produceRequestData, short version) { super(ApiKeys.PRODUCE, version); -partitionRecords = new HashMap<>(); -for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) { -Struct topicData = (Struct) topicDataObj; -String topic = topicData.get(TOPIC_NAME); -for (Object partitionResponseObj : topicData.getArray(PARTITION_DATA_KEY_NAME)) { -Struct partitionResponse = (Struct) partitionResponseObj; -int partition = partitionResponse.get(PARTITION_ID); -MemoryRecords records = (MemoryRecords) partitionResponse.getRecords(RECORD_SET_KEY_NAME); -setFlags(records); -partitionRecords.put(new TopicPartition(topic, partition), records); -} -} -partitionSizes = createPartitionSizes(partitionRecords); -acks = struct.getShort(ACKS_KEY_NAME); -timeout = struct.getInt(TIMEOUT_KEY_NAME); -transactionalId = struct.getOrElse(NULLABLE_TRANSACTIONAL_ID, null); -} - -private void setFlags(MemoryRecords records) { -Iterator iterator = records.batches().iterator(); -MutableRecordBatch entry = iterator.next(); -hasIdempotentRecords = hasIdempotentRecords || entry.hasProducerId(); -hasTransactionalRecords = hasTransactionalRecords || entry.isTransactional(); +this.data = produceRequestData; +this.data.topicData().forEach(topicProduceData -> topicProduceData.partitions() +.forEach(partitionProduceData -> { +MemoryRecords records = MemoryRecords.readableRecords(partitionProduceData.records()); +Iterator iterator = records.batches().iterator(); +MutableRecordBatch entry = iterator.next(); +hasIdempotentRecords = hasIdempotentRecords || entry.hasProducerId(); Review comment: Nevermind, I guess we have to do it here because the server needs to validate the request received from the client. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10661) Add resigned state to raft state machine to preserve leader/epoch information
Jason Gustafson created KAFKA-10661: --- Summary: Add resigned state to raft state machine to preserve leader/epoch information Key: KAFKA-10661 URL: https://issues.apache.org/jira/browse/KAFKA-10661 Project: Kafka Issue Type: Sub-task Reporter: Jason Gustafson Assignee: Jason Gustafson While working on KAFKA-10655, I realized we have a bug in the existing raft state initialization logic when the process shuts down as leader. After reinitializing, we retain the current epoch, but we discard the current leader status. This means that it is possible for the node to vote for another node in the same epoch that it was the leader of. To fix this problem I think we should add a separate "resigned" state. When re-initializing after being shutdown as leader, we can enter the "resigned" state. This prevents us from voting for another candidate while still ensuring that a new election needs to be held. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9892) Producer state snapshot needs to be forced to disk
[ https://issues.apache.org/jira/browse/KAFKA-9892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17223094#comment-17223094 ] Jun Rao commented on KAFKA-9892: [~kbrajesh176]: Thanks for your interest. Just added you to Kafka contributor list. So, feel free to assign this jira to yourself. > Producer state snapshot needs to be forced to disk > -- > > Key: KAFKA-9892 > URL: https://issues.apache.org/jira/browse/KAFKA-9892 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.6.0 >Reporter: Jun Rao >Priority: Major > > Currently, ProducerStateManager.writeSnapshot() only calls > fileChannel.close(), but not explicitly fileChannel.force(). It seems force() > is not guaranteed to be called on close(). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei merged pull request #9426: MINOR: Fix flaky shouldRejectNonExistentStoreName
vvcephei merged pull request #9426: URL: https://github.com/apache/kafka/pull/9426 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9892) Producer state snapshot needs to be forced to disk
[ https://issues.apache.org/jira/browse/KAFKA-9892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17223070#comment-17223070 ] Brajesh Kumar commented on KAFKA-9892: --- [~junrao] [~ijuma] Can I pick this up please? It seems to be a good beginner bug. > Producer state snapshot needs to be forced to disk > -- > > Key: KAFKA-9892 > URL: https://issues.apache.org/jira/browse/KAFKA-9892 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.6.0 >Reporter: Jun Rao >Priority: Major > > Currently, ProducerStateManager.writeSnapshot() only calls > fileChannel.close(), but not explicitly fileChannel.force(). It seems force() > is not guaranteed to be called on close(). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on pull request #9426: MINOR: Fix flaky shouldRejectNonExistentStoreName
vvcephei commented on pull request #9426: URL: https://github.com/apache/kafka/pull/9426#issuecomment-718900823 The only build failure was: `Build / JDK 11 / org.apache.kafka.streams.integration.StreamTableJoinTopologyOptimizationIntegrationTest.shouldDoStreamTableJoinWithDifferentNumberOfPartitions[Optimization = none]` Which seems to have been caused by the OS deleting some files from the `tmp` directory during the run. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10638) QueryableStateIntegrationTest fails due to stricter store checking
[ https://issues.apache.org/jira/browse/KAFKA-10638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-10638. -- Resolution: Fixed > QueryableStateIntegrationTest fails due to stricter store checking > -- > > Key: KAFKA-10638 > URL: https://issues.apache.org/jira/browse/KAFKA-10638 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0 >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.7.0 > > > Observed: > {code:java} > org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state > store source-table because the stream thread is PARTITIONS_ASSIGNED, not > RUNNING > at > org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:81) > at > org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:50) > at > org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:52) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:200) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) > at > org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:119) > at
[jira] [Updated] (KAFKA-10638) QueryableStateIntegrationTest fails due to stricter store checking
[ https://issues.apache.org/jira/browse/KAFKA-10638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10638: - Fix Version/s: (was: 2.7.1) (was: 2.8.0) 2.7.0 > QueryableStateIntegrationTest fails due to stricter store checking > -- > > Key: KAFKA-10638 > URL: https://issues.apache.org/jira/browse/KAFKA-10638 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0 >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.7.0 > > > Observed: > {code:java} > org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state > store source-table because the stream thread is PARTITIONS_ASSIGNED, not > RUNNING > at > org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:81) > at > org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:50) > at > org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:52) > at > org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:200) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) > at > org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) > at > org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) > at > org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) > at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) > at >
[jira] [Commented] (KAFKA-9948) Gradle Issue
[ https://issues.apache.org/jira/browse/KAFKA-9948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17223044#comment-17223044 ] Brajesh Kumar commented on KAFKA-9948: --- [~dulvinw] Which command you are using to build kafka? As suggested by [~chia7712] You should use ".*/gradlew*" or can you try running "*gradle build*" instead of just "*gradle*" to build kafka code base. > Gradle Issue > > > Key: KAFKA-9948 > URL: https://issues.apache.org/jira/browse/KAFKA-9948 > Project: Kafka > Issue Type: Bug > Components: build >Affects Versions: 2.4.1 > Environment: gradle -v > > Gradle 6.0.1 > > Build time: 2019-11-18 20:25:01 UTC > Revision: fad121066a68c4701acd362daf4287a7c309a0f5 > Kotlin: 1.3.50 > Groovy: 2.5.8 > Ant: Apache Ant(TM) version 1.10.7 compiled on September 1 2019 > JVM: 1.8.0_152 (Oracle Corporation 25.152-b16) > OS: Mac OS X 10.15.4 x86_64 >Reporter: Dulvin Witharane >Priority: Blocker > > Can't get Gradle to build kafka. > > Build file '/Users/dulvin/Documents/Work/git/kafka/build.gradle' line: 457 > A problem occurred evaluating root project 'kafka'. > > Could not create task ':clients:spotbugsMain'. > > Could not create task of type 'SpotBugsTask'. > > Could not create an instance of type > com.github.spotbugs.internal.SpotBugsReportsImpl. > > > org.gradle.api.reporting.internal.TaskReportContainer.(Ljava/lang/Class;Lorg/gradle/api/Task;)V > > The above error is thrown -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-9948) Gradle Issue
[ https://issues.apache.org/jira/browse/KAFKA-9948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17223044#comment-17223044 ] Brajesh Kumar edited comment on KAFKA-9948 at 10/29/20, 5:02 PM: -- [~dulvinw] Which command you are using to build kafka? As suggested by [~chia7712] You should use ".*/gradlew*" or can you try running "*gradle build*" instead of just "*gradle*" to build kafka code base. On my Amazon Linux system "*gradle build*" works fine too. was (Author: kbrajesh176): [~dulvinw] Which command you are using to build kafka? As suggested by [~chia7712] You should use ".*/gradlew*" or can you try running "*gradle build*" instead of just "*gradle*" to build kafka code base. > Gradle Issue > > > Key: KAFKA-9948 > URL: https://issues.apache.org/jira/browse/KAFKA-9948 > Project: Kafka > Issue Type: Bug > Components: build >Affects Versions: 2.4.1 > Environment: gradle -v > > Gradle 6.0.1 > > Build time: 2019-11-18 20:25:01 UTC > Revision: fad121066a68c4701acd362daf4287a7c309a0f5 > Kotlin: 1.3.50 > Groovy: 2.5.8 > Ant: Apache Ant(TM) version 1.10.7 compiled on September 1 2019 > JVM: 1.8.0_152 (Oracle Corporation 25.152-b16) > OS: Mac OS X 10.15.4 x86_64 >Reporter: Dulvin Witharane >Priority: Blocker > > Can't get Gradle to build kafka. > > Build file '/Users/dulvin/Documents/Work/git/kafka/build.gradle' line: 457 > A problem occurred evaluating root project 'kafka'. > > Could not create task ':clients:spotbugsMain'. > > Could not create task of type 'SpotBugsTask'. > > Could not create an instance of type > com.github.spotbugs.internal.SpotBugsReportsImpl. > > > org.gradle.api.reporting.internal.TaskReportContainer.(Ljava/lang/Class;Lorg/gradle/api/Task;)V > > The above error is thrown -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup
vvcephei commented on pull request #9414: URL: https://github.com/apache/kafka/pull/9414#issuecomment-718889494 Hey @dongjinleekr , just a quick check-in: do you plan to take a look at the test failures? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10373) Kafka Reassign Partition is stuck with Java OutOfMemory error
[ https://issues.apache.org/jira/browse/KAFKA-10373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17223041#comment-17223041 ] Brajesh Kumar commented on KAFKA-10373: Hi What is your Environment configurations(Linux/MAC OS)? Do you have any ACL set on the cluster? Can you please share the steps to reproduce the issue. I am unable to reproduce the issue on Amazon Linux. {code:java} dev-dsk-brjkumar-1c-3f1604a7 % ./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file topics-reassignment.json --bootstrap-server localhost:9093 --execute Current partition replica assignment {"version":1,"partitions":[{"topic":"test","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"test","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"test","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"test","partition":0,"replicas":[2,1],"log_dirs":["any","any"]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions. (20-10-29 16:55:05) <0> [~/kafka/kafka_2.11-2.2.1/bin] dev-dsk-brjkumar-1c-3f1604a7 % ./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file topics-reassignment.json --bootstrap-server localhost:9093 --verify Status of partition reassignment: Reassignment of partition test-1 completed successfully Reassignment of partition test-3 completed successfully Reassignment of partition test-0 completed successfully Reassignment of partition test-2 completed successfully{code} > Kafka Reassign Partition is stuck with Java OutOfMemory error > - > > Key: KAFKA-10373 > URL: https://issues.apache.org/jira/browse/KAFKA-10373 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.2.1 >Reporter: azher khan >Priority: Major > > Hi Team, > While trying to run the Kafka script to reassign partitions of an existing > topic, we are seeing a Java OutOfMemory issue. > > The heap for Kafka is set to "-Xmx1G -Xms1G" on the kafka broker. > > {code:java} > /opt/kafka/bin/kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 > --reassignment-json-file topic_kafka_topic1_reassignment.json > --bootstrap-server kafkabroker1:9092 --verify > Status of partition reassignment: > [2020-08-07 XX:XX:XX,] ERROR Uncaught exception in thread > 'kafka-admin-client-thread | reassign-partitions-tool': > (org.apache.kafka.common.utils.KafkaThread) > java.lang.OutOfMemoryError: Java heap space > at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) > at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) > at > org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) > at > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112) > at > org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:335) > at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:296) > at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:560) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:496) > at org.apache.kafka.common.network.Selector.poll(Selector.java:425) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510) > at > org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1116) > at java.lang.Thread.run(Thread.java:748) > Reassignment of partition kafka_topic1-0 is still in progress > Reassignment of partition kafka_topic1-1 is still in progress > Reassignment of partition kafka_topic1-2 is still in progress{code} > > Retried the above command after removing the "reassign_partitions" from > zookeeper as suggested but we are seeing the same error. > > > {code:java} > [zk: localhost:2181(CONNECTED) 5] delete /admin/reassign_partitions > [zk: localhost:2181(CONNECTED) 7] ls /admin > [delete_topics] > {code} > > Would highly appreciate your advice, > Thank you in advance, > > Regards, > Azher Khan -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10373) Kafka Reassign Partition is stuck with Java OutOfMemory error
[ https://issues.apache.org/jira/browse/KAFKA-10373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17223041#comment-17223041 ] Brajesh Kumar edited comment on KAFKA-10373 at 10/29/20, 4:59 PM: --- Hi What is your Environment configurations(Linux/MAC OS)? Do you have any ACL set on the cluster? [~azherullahkhan] Can you please share the steps to reproduce the issue. I am unable to reproduce the issue on Amazon Linux. {code:java} dev-dsk-brjkumar-1c-3f1604a7 % ./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file topics-reassignment.json --bootstrap-server localhost:9093 --execute Current partition replica assignment {"version":1,"partitions":[{"topic":"test","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"test","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"test","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"test","partition":0,"replicas":[2,1],"log_dirs":["any","any"]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions. (20-10-29 16:55:05) <0> [~/kafka/kafka_2.11-2.2.1/bin] dev-dsk-brjkumar-1c-3f1604a7 % ./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file topics-reassignment.json --bootstrap-server localhost:9093 --verify Status of partition reassignment: Reassignment of partition test-1 completed successfully Reassignment of partition test-3 completed successfully Reassignment of partition test-0 completed successfully Reassignment of partition test-2 completed successfully{code} was (Author: kbrajesh176): Hi What is your Environment configurations(Linux/MAC OS)? Do you have any ACL set on the cluster? Can you please share the steps to reproduce the issue. I am unable to reproduce the issue on Amazon Linux. {code:java} dev-dsk-brjkumar-1c-3f1604a7 % ./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file topics-reassignment.json --bootstrap-server localhost:9093 --execute Current partition replica assignment {"version":1,"partitions":[{"topic":"test","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"test","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"test","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"test","partition":0,"replicas":[2,1],"log_dirs":["any","any"]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions. (20-10-29 16:55:05) <0> [~/kafka/kafka_2.11-2.2.1/bin] dev-dsk-brjkumar-1c-3f1604a7 % ./kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file topics-reassignment.json --bootstrap-server localhost:9093 --verify Status of partition reassignment: Reassignment of partition test-1 completed successfully Reassignment of partition test-3 completed successfully Reassignment of partition test-0 completed successfully Reassignment of partition test-2 completed successfully{code} > Kafka Reassign Partition is stuck with Java OutOfMemory error > - > > Key: KAFKA-10373 > URL: https://issues.apache.org/jira/browse/KAFKA-10373 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.2.1 >Reporter: azher khan >Priority: Major > > Hi Team, > While trying to run the Kafka script to reassign partitions of an existing > topic, we are seeing a Java OutOfMemory issue. > > The heap for Kafka is set to "-Xmx1G -Xms1G" on the kafka broker. > > {code:java} > /opt/kafka/bin/kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 > --reassignment-json-file topic_kafka_topic1_reassignment.json > --bootstrap-server kafkabroker1:9092 --verify > Status of partition reassignment: > [2020-08-07 XX:XX:XX,] ERROR Uncaught exception in thread > 'kafka-admin-client-thread | reassign-partitions-tool': > (org.apache.kafka.common.utils.KafkaThread) > java.lang.OutOfMemoryError: Java heap space > at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57) > at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) > at > org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) > at > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112) > at > org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:335) > at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:296) > at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:560) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:496) > at org.apache.kafka.common.network.Selector.poll(Selector.java:425) > at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510) > at >
[GitHub] [kafka] vvcephei merged pull request #9521: KAFKA-10638: Fix QueryableStateIntegrationTest
vvcephei merged pull request #9521: URL: https://github.com/apache/kafka/pull/9521 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9521: KAFKA-10638: Fix QueryableStateIntegrationTest
vvcephei commented on pull request #9521: URL: https://github.com/apache/kafka/pull/9521#issuecomment-718885978 The tests passed, and I answered the two review questions. I'll go ahead and merge. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9467: KAFKA-10515: Properly initialize nullable Serdes with default values
vvcephei commented on pull request #9467: URL: https://github.com/apache/kafka/pull/9467#issuecomment-718882348 _Finally_ merged that Jenkinsfile PR (seeing a lot of timeouts, but decided just to go ahead and merge it). Merged in the 2.6 branch, and now the tests are running! We should hopefully get to merge this today. Thanks again, @thake This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #9471: MINOR: Add Jenkinsfile to 2.6
vvcephei merged pull request #9471: URL: https://github.com/apache/kafka/pull/9471 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10660) Poll time out logstash
David created KAFKA-10660: - Summary: Poll time out logstash Key: KAFKA-10660 URL: https://issues.apache.org/jira/browse/KAFKA-10660 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.2.1 Environment: Non Production Reporter: David I am getting below message (logstash log from kafka input which I believe I need increase max.poll.interval.ms (I think the default is 3) This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] abbccdda commented on pull request #9529: Revert initial principal from 2.7
abbccdda commented on pull request #9529: URL: https://github.com/apache/kafka/pull/9529#issuecomment-718871156 @bbejeck @hachikuji for a review This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda opened a new pull request #9529: Revert initial principal from 2.7
abbccdda opened a new pull request #9529: URL: https://github.com/apache/kafka/pull/9529 Need to make sure 2.7 doesn't bump the request header unexpectedly. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #9522: MINOR: revert initial principal PR
abbccdda commented on pull request #9522: URL: https://github.com/apache/kafka/pull/9522#issuecomment-718869803 Reopening this PR to merge to 2.7 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck opened a new pull request #9528: MINOR: Add releaseTarGz to args for building docs
bbejeck opened a new pull request #9528: URL: https://github.com/apache/kafka/pull/9528 This PR adds the `releaseTarGz` command back into the `release.py::command_stage_docs` method when building docs for staging *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol
hachikuji commented on pull request #9401: URL: https://github.com/apache/kafka/pull/9401#issuecomment-718828042 @chia7712 One thing that would be useful is running the producer-performance test, just to make sure the the performance is inline. Might be worth checking flame graphs as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol
hachikuji commented on a change in pull request #9401: URL: https://github.com/apache/kafka/pull/9401#discussion_r514326022 ## File path: clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java ## @@ -210,65 +142,42 @@ public String toString() { } } +/** + * We have to copy acks, timeout, transactionalId and partitionSizes from data since data maybe reset to eliminate + * the reference to ByteBuffer but those metadata are still useful. + */ private final short acks; private final int timeout; private final String transactionalId; - -private final Map partitionSizes; - +// visible for testing +final Map partitionSizes; +private boolean hasTransactionalRecords = false; +private boolean hasIdempotentRecords = false; // This is set to null by `clearPartitionRecords` to prevent unnecessary memory retention when a produce request is // put in the purgatory (due to client throttling, it can take a while before the response is sent). // Care should be taken in methods that use this field. -private volatile Map partitionRecords; -private boolean hasTransactionalRecords = false; -private boolean hasIdempotentRecords = false; - -private ProduceRequest(short version, short acks, int timeout, Map partitionRecords, String transactionalId) { -super(ApiKeys.PRODUCE, version); -this.acks = acks; -this.timeout = timeout; - -this.transactionalId = transactionalId; -this.partitionRecords = partitionRecords; -this.partitionSizes = createPartitionSizes(partitionRecords); +private volatile ProduceRequestData data; -for (MemoryRecords records : partitionRecords.values()) { -setFlags(records); -} -} - -private static Map createPartitionSizes(Map partitionRecords) { -Map result = new HashMap<>(partitionRecords.size()); -for (Map.Entry entry : partitionRecords.entrySet()) -result.put(entry.getKey(), entry.getValue().sizeInBytes()); -return result; -} - -public ProduceRequest(Struct struct, short version) { +public ProduceRequest(ProduceRequestData produceRequestData, short version) { super(ApiKeys.PRODUCE, version); -partitionRecords = new HashMap<>(); -for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) { -Struct topicData = (Struct) topicDataObj; -String topic = topicData.get(TOPIC_NAME); -for (Object partitionResponseObj : topicData.getArray(PARTITION_DATA_KEY_NAME)) { -Struct partitionResponse = (Struct) partitionResponseObj; -int partition = partitionResponse.get(PARTITION_ID); -MemoryRecords records = (MemoryRecords) partitionResponse.getRecords(RECORD_SET_KEY_NAME); -setFlags(records); -partitionRecords.put(new TopicPartition(topic, partition), records); -} -} -partitionSizes = createPartitionSizes(partitionRecords); -acks = struct.getShort(ACKS_KEY_NAME); -timeout = struct.getInt(TIMEOUT_KEY_NAME); -transactionalId = struct.getOrElse(NULLABLE_TRANSACTIONAL_ID, null); -} - -private void setFlags(MemoryRecords records) { -Iterator iterator = records.batches().iterator(); -MutableRecordBatch entry = iterator.next(); -hasIdempotentRecords = hasIdempotentRecords || entry.hasProducerId(); -hasTransactionalRecords = hasTransactionalRecords || entry.isTransactional(); +this.data = produceRequestData; +this.data.topicData().forEach(topicProduceData -> topicProduceData.partitions() +.forEach(partitionProduceData -> { +MemoryRecords records = MemoryRecords.readableRecords(partitionProduceData.records()); +Iterator iterator = records.batches().iterator(); +MutableRecordBatch entry = iterator.next(); +hasIdempotentRecords = hasIdempotentRecords || entry.hasProducerId(); Review comment: Would it make sense to move this to the builder where we are already doing a pass over the partitions? ## File path: clients/src/main/resources/common/message/ProduceRequest.json ## @@ -33,21 +33,21 @@ "validVersions": "0-8", "flexibleVersions": "none", "fields": [ -{ "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "0+", "entityType": "transactionalId", +{ "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "3+", "ignorable": true, "entityType": "transactionalId", "about": "The transactional ID, or null if the producer is not transactional." }, { "name": "Acks", "type": "int16", "versions": "0+", "about": "The number of acknowledgments the producer requires the leader to have received before considering a
[GitHub] [kafka] vvcephei commented on pull request #9471: MINOR: Add Jenkinsfile to 2.6
vvcephei commented on pull request #9471: URL: https://github.com/apache/kafka/pull/9471#issuecomment-718826291 Ok, I've run it a total of 5 times now, and it has passed two times and timed out three times. I did some analysis on the timed out logs, and found that every test that started either passed or got skipped. I was hoping to find a test that hung, but no luck. For example, I downloaded the logs for run 1, which timed out as `timeout-1.txt`: ```bash [john@arcturus Downloads]$ cat timeout-1.txt | grep STARTED | sed 's|^\[\S*] ||' | sed 's/ STARTED//' | sort > /tmp/1-started [john@arcturus Downloads]$ cat timeout-1.txt | grep PASSED | sed 's|^\[\S*] ||' | sed 's/ PASSED//' | sort > /tmp/1-passed [john@arcturus Downloads]$ cat timeout-1.txt | grep SKIPPED | sed 's|^\[\S*] ||' | sed 's/ SKIPPED//' | sort > /tmp/1-skipped [john@arcturus Downloads]$ wc -l /tmp/1-skipped /tmp/1-passed 61 /tmp/1-skipped 10891 /tmp/1-passed 10952 total [john@arcturus Downloads]$ wc -l /tmp/1-started 10952 /tmp/1-started ``` Since all the other builds in all the other branches are working with effectively the same Jenkinsfile, I'm deeply suspicious that there actually is something wrong with the 2.6 codebase and not this PR itself. I think we should go ahead and merge this in some form. I could see: 1. Just merge this as-is and file a Jira ticket to investigate why java 11 builds time out on 2.6. 2. Exclude java 11 from the jenkinsfile and go ahead and merge with just java 8 and 14. WDYT, @omkreddy @jolshan @ijuma ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9521: KAFKA-10638: Fix QueryableStateIntegrationTest
vvcephei commented on a change in pull request #9521: URL: https://github.com/apache/kafka/pull/9521#discussion_r514325710 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java ## @@ -153,51 +179,75 @@ public void shouldQuerySpecificActivePartitionStores() throws Exception { // Assert that all messages in the first batch were processed in a timely manner assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); -final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0); - -//key belongs to this partition -final int keyPartition = keyQueryMetadata.partition(); - -//key doesn't belongs to this partition -final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0; -final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1; - -StoreQueryParameters> storeQueryParam = -StoreQueryParameters.>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore()) -.withPartition(keyPartition); -ReadOnlyKeyValueStore store1 = null; -ReadOnlyKeyValueStore store2 = null; -if (kafkaStreams1IsActive) { -store1 = IntegrationTestUtils.getStore(kafkaStreams1, storeQueryParam); -} else { -store2 = IntegrationTestUtils.getStore(kafkaStreams2, storeQueryParam); -} - -if (kafkaStreams1IsActive) { -assertThat(store1, is(notNullValue())); -assertThat(store2, is(nullValue())); -} else { -assertThat(store2, is(notNullValue())); -assertThat(store1, is(nullValue())); -} +until(() -> { +final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0); + +//key belongs to this partition +final int keyPartition = keyQueryMetadata.partition(); + +//key doesn't belongs to this partition +final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0; +final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1; + +final StoreQueryParameters> storeQueryParam = +StoreQueryParameters.>fromNameAndType(TABLE_NAME, keyValueStore()) +.withPartition(keyPartition); +ReadOnlyKeyValueStore store1 = null; +ReadOnlyKeyValueStore store2 = null; +if (kafkaStreams1IsActive) { +store1 = getStore(kafkaStreams1, storeQueryParam); +} else { +store2 = getStore(kafkaStreams2, storeQueryParam); +} + +if (kafkaStreams1IsActive) { +assertThat(store1, is(notNullValue())); +assertThat(store2, is(nullValue())); +} else { +assertThat(store2, is(notNullValue())); +assertThat(store1, is(nullValue())); +} + +// Assert that only active for a specific requested partition serves key if stale stores and not enabled +assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue())); + +final StoreQueryParameters> storeQueryParam2 = +StoreQueryParameters.>fromNameAndType(TABLE_NAME, keyValueStore()) +.withPartition(keyDontBelongPartition); -// Assert that only active for a specific requested partition serves key if stale stores and not enabled -assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue())); -storeQueryParam = StoreQueryParameters.>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore()) -.withPartition(keyDontBelongPartition); -ReadOnlyKeyValueStore store3 = null; -ReadOnlyKeyValueStore store4 = null; -if (!kafkaStreams1IsActive) { -store3 = IntegrationTestUtils.getStore(kafkaStreams1, storeQueryParam); -} else { -store4 = IntegrationTestUtils.getStore(kafkaStreams2, storeQueryParam); -} -// Assert that key is not served when wrong specific partition is requested -// If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition -// So, in that case, store3 would be null and the store4 would not return the value for key as wrong partition was requested -assertThat(kafkaStreams1IsActive ? store4.get(key) : store3.get(key), is(nullValue())); +try { +// Assert that key is not served when wrong specific partition is requested +// If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be active for keyDontBelongPartition +
[GitHub] [kafka] vvcephei commented on a change in pull request #9521: KAFKA-10638: Fix QueryableStateIntegrationTest
vvcephei commented on a change in pull request #9521: URL: https://github.com/apache/kafka/pull/9521#discussion_r514323639 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java ## @@ -117,17 +126,34 @@ public void shouldQueryOnlyActivePartitionStoresByDefault() throws Exception { // Assert that all messages in the first batch were processed in a timely manner assertThat(semaphore.tryAcquire(batch1NumMessages, 60, TimeUnit.SECONDS), is(equalTo(true))); -final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0); - -final QueryableStoreType> queryableStoreType = QueryableStoreTypes.keyValueStore(); -final ReadOnlyKeyValueStore store1 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, queryableStoreType); -final ReadOnlyKeyValueStore store2 = IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, queryableStoreType); - -final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1; - -// Assert that only active is able to query for a key by default -assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue())); -assertThat(kafkaStreams1IsActive ? store2.get(key) : store1.get(key), is(nullValue())); +until(() -> { + +final KeyQueryMetadata keyQueryMetadata = kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, numPartitions) -> 0); + +final QueryableStoreType> queryableStoreType = keyValueStore(); +final ReadOnlyKeyValueStore store1 = getStore(TABLE_NAME, kafkaStreams1, queryableStoreType); +final ReadOnlyKeyValueStore store2 = getStore(TABLE_NAME, kafkaStreams2, queryableStoreType); + +final boolean kafkaStreams1IsActive = (keyQueryMetadata.activeHost().port() % 2) == 1; + +// Assert that only active is able to query for a key by default +assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), is(notNullValue())); +try { +if (kafkaStreams1IsActive) { +assertThat(store2.get(key), is(nullValue())); +} else { +assertThat(store1.get(key), is(nullValue())); +} +return true; +} catch (final InvalidStateStoreException exception) { Review comment: I wanted to keep the concerns separate, so that unexpected exceptions would cause the test to fail fast. The idea is that `until` is the inverse of `while`, namely, it just loops as long as the condition evaluates to `false`. If the condition throws an exception, then the loop also throws, just like the real `while` loop. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org