[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514882473



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, 
JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), 
request.version())
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version())
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version())
+  case res: DescribeUserScramCredentialsRequest => 

[GitHub] [kafka] ning2008wisc commented on pull request #9224: KAFKA-10304: refactor MM2 integration tests

2020-10-29 Thread GitBox


ning2008wisc commented on pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#issuecomment-719201216


   A kind reminder for additional feedback and comments, based on my response 
above



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514872477



##
File path: core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala
##
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util.HashMap
+
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message._
+import org.junit.Test
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests._
+
+import scala.collection.mutable.ArrayBuffer
+
+class RequestConvertToJsonTest {
+
+  def createRequestsFromApiKey(apiKey: ApiKeys, version: Short): 
AbstractRequest = apiKey match {
+case ApiKeys.PRODUCE => ProduceRequest.Builder.forCurrentMagic(0.toShort, 
1, new HashMap[TopicPartition, MemoryRecords]()).build()
+case ApiKeys.FETCH => new FetchRequest(new FetchRequestData(), version)
+case ApiKeys.LIST_OFFSETS => new ListOffsetRequest(new 
ListOffsetRequestData().toStruct(version), version)
+case ApiKeys.METADATA => new MetadataRequest(new MetadataRequestData(), 
version)
+case ApiKeys.OFFSET_COMMIT => new OffsetCommitRequest(new 
OffsetCommitRequestData(), version)
+case ApiKeys.OFFSET_FETCH => new OffsetFetchRequest(new 
OffsetFetchRequestData().toStruct(version), version)
+case ApiKeys.FIND_COORDINATOR => new FindCoordinatorRequest(new 
FindCoordinatorRequestData().toStruct(version), version)
+case ApiKeys.JOIN_GROUP => new JoinGroupRequest(new 
JoinGroupRequestData(), version)
+case ApiKeys.HEARTBEAT => new HeartbeatRequest(new 
HeartbeatRequestData().toStruct(version), version)
+case ApiKeys.LEAVE_GROUP => new LeaveGroupRequest(new 
LeaveGroupRequestData().toStruct(version), version)

Review comment:
   Actually, I found a workaround for this mapping and removed it all 
together, so there'd be one less apikey mapping to maintain.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514619891



##
File path: core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala
##
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util.HashMap
+
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message._
+import org.junit.Test
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests._
+
+import scala.collection.mutable.ArrayBuffer
+
+class RequestConvertToJsonTest {
+
+  def createRequestsFromApiKey(apiKey: ApiKeys, version: Short): 
AbstractRequest = apiKey match {
+case ApiKeys.PRODUCE => ProduceRequest.Builder.forCurrentMagic(0.toShort, 
1, new HashMap[TopicPartition, MemoryRecords]()).build()
+case ApiKeys.FETCH => new FetchRequest(new FetchRequestData(), version)
+case ApiKeys.LIST_OFFSETS => new ListOffsetRequest(new 
ListOffsetRequestData().toStruct(version), version)
+case ApiKeys.METADATA => new MetadataRequest(new MetadataRequestData(), 
version)
+case ApiKeys.OFFSET_COMMIT => new OffsetCommitRequest(new 
OffsetCommitRequestData(), version)
+case ApiKeys.OFFSET_FETCH => new OffsetFetchRequest(new 
OffsetFetchRequestData().toStruct(version), version)
+case ApiKeys.FIND_COORDINATOR => new FindCoordinatorRequest(new 
FindCoordinatorRequestData().toStruct(version), version)
+case ApiKeys.JOIN_GROUP => new JoinGroupRequest(new 
JoinGroupRequestData(), version)
+case ApiKeys.HEARTBEAT => new HeartbeatRequest(new 
HeartbeatRequestData().toStruct(version), version)
+case ApiKeys.LEAVE_GROUP => new LeaveGroupRequest(new 
LeaveGroupRequestData().toStruct(version), version)

Review comment:
   I guess this is related with the inconsistency issue, but I tried not to 
make too many changes. The constructors with the `data` parameters were private 
in some cases, so I used the `toStruct`. But for consistency, I'll change to 
use `toStruct` instead.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514619891



##
File path: core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala
##
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util.HashMap
+
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message._
+import org.junit.Test
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests._
+
+import scala.collection.mutable.ArrayBuffer
+
+class RequestConvertToJsonTest {
+
+  def createRequestsFromApiKey(apiKey: ApiKeys, version: Short): 
AbstractRequest = apiKey match {
+case ApiKeys.PRODUCE => ProduceRequest.Builder.forCurrentMagic(0.toShort, 
1, new HashMap[TopicPartition, MemoryRecords]()).build()
+case ApiKeys.FETCH => new FetchRequest(new FetchRequestData(), version)
+case ApiKeys.LIST_OFFSETS => new ListOffsetRequest(new 
ListOffsetRequestData().toStruct(version), version)
+case ApiKeys.METADATA => new MetadataRequest(new MetadataRequestData(), 
version)
+case ApiKeys.OFFSET_COMMIT => new OffsetCommitRequest(new 
OffsetCommitRequestData(), version)
+case ApiKeys.OFFSET_FETCH => new OffsetFetchRequest(new 
OffsetFetchRequestData().toStruct(version), version)
+case ApiKeys.FIND_COORDINATOR => new FindCoordinatorRequest(new 
FindCoordinatorRequestData().toStruct(version), version)
+case ApiKeys.JOIN_GROUP => new JoinGroupRequest(new 
JoinGroupRequestData(), version)
+case ApiKeys.HEARTBEAT => new HeartbeatRequest(new 
HeartbeatRequestData().toStruct(version), version)
+case ApiKeys.LEAVE_GROUP => new LeaveGroupRequest(new 
LeaveGroupRequestData().toStruct(version), version)

Review comment:
   I guess this is related with the inconsistency issue, but I tried not to 
make too many changes. The constructors with the `data` parameters were private 
in some cases, so I used the `toStruct`. But for consistency, I'll change to 
use the Builder class instead.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514861991



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, 
JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), 
request.version())
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version())
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version())
+  case res: DescribeUserScramCredentialsRequest => 

[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514861389



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, 
JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), 
request.version())
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version())
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version())
+  case res: DescribeUserScramCredentialsRequest => 

[GitHub] [kafka] chia7712 commented on a change in pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol

2020-10-29 Thread GitBox


chia7712 commented on a change in pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#discussion_r514819326



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
##
@@ -210,65 +142,42 @@ public String toString() {
 }
 }
 
+/**
+ * We have to copy acks, timeout, transactionalId and partitionSizes from 
data since data maybe reset to eliminate
+ * the reference to ByteBuffer but those metadata are still useful.
+ */
 private final short acks;
 private final int timeout;
 private final String transactionalId;
-
-private final Map partitionSizes;
-
+// visible for testing
+final Map partitionSizes;
+private boolean hasTransactionalRecords = false;
+private boolean hasIdempotentRecords = false;
 // This is set to null by `clearPartitionRecords` to prevent unnecessary 
memory retention when a produce request is
 // put in the purgatory (due to client throttling, it can take a while 
before the response is sent).
 // Care should be taken in methods that use this field.
-private volatile Map partitionRecords;
-private boolean hasTransactionalRecords = false;
-private boolean hasIdempotentRecords = false;
-
-private ProduceRequest(short version, short acks, int timeout, 
Map partitionRecords, String transactionalId) {
-super(ApiKeys.PRODUCE, version);
-this.acks = acks;
-this.timeout = timeout;
-
-this.transactionalId = transactionalId;
-this.partitionRecords = partitionRecords;
-this.partitionSizes = createPartitionSizes(partitionRecords);
+private volatile ProduceRequestData data;
 
-for (MemoryRecords records : partitionRecords.values()) {
-setFlags(records);
-}
-}
-
-private static Map 
createPartitionSizes(Map partitionRecords) {
-Map result = new 
HashMap<>(partitionRecords.size());
-for (Map.Entry entry : 
partitionRecords.entrySet())
-result.put(entry.getKey(), entry.getValue().sizeInBytes());
-return result;
-}
-
-public ProduceRequest(Struct struct, short version) {
+public ProduceRequest(ProduceRequestData produceRequestData, short 
version) {
 super(ApiKeys.PRODUCE, version);
-partitionRecords = new HashMap<>();
-for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) {
-Struct topicData = (Struct) topicDataObj;
-String topic = topicData.get(TOPIC_NAME);
-for (Object partitionResponseObj : 
topicData.getArray(PARTITION_DATA_KEY_NAME)) {
-Struct partitionResponse = (Struct) partitionResponseObj;
-int partition = partitionResponse.get(PARTITION_ID);
-MemoryRecords records = (MemoryRecords) 
partitionResponse.getRecords(RECORD_SET_KEY_NAME);
-setFlags(records);
-partitionRecords.put(new TopicPartition(topic, partition), 
records);
-}
-}
-partitionSizes = createPartitionSizes(partitionRecords);
-acks = struct.getShort(ACKS_KEY_NAME);
-timeout = struct.getInt(TIMEOUT_KEY_NAME);
-transactionalId = struct.getOrElse(NULLABLE_TRANSACTIONAL_ID, null);
-}
-
-private void setFlags(MemoryRecords records) {
-Iterator iterator = records.batches().iterator();
-MutableRecordBatch entry = iterator.next();
-hasIdempotentRecords = hasIdempotentRecords || entry.hasProducerId();
-hasTransactionalRecords = hasTransactionalRecords || 
entry.isTransactional();
+this.data = produceRequestData;
+this.data.topicData().forEach(topicProduceData -> 
topicProduceData.partitions()
+.forEach(partitionProduceData -> {
+MemoryRecords records = 
MemoryRecords.readableRecords(partitionProduceData.records());
+Iterator iterator = 
records.batches().iterator();
+MutableRecordBatch entry = iterator.next();
+hasIdempotentRecords = hasIdempotentRecords || 
entry.hasProducerId();

Review comment:
   clients module has some tests which depends on it so I moves the helper 
to ```RequestUtils```.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming opened a new pull request #9535: MINOR: remove redundant return statement

2020-10-29 Thread GitBox


dengziming opened a new pull request #9535:
URL: https://github.com/apache/kafka/pull/9535


   the result of `GroupMetadataManager.storeOffsets` is Unit, so remove the 
redundant return statement



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-6217) Allow consumers to read messages from LEO

2020-10-29 Thread Ashok Bala (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ashok Bala updated KAFKA-6217:
--
Description: 
we have a use case(real time customer facing application) to support high 
throughput(reduced latency) & high volume of transactions through Kafka. During 
performance testing, we noted that consumer listening to single replicated 
partition topics get messages faster than the consumers listening to replicated 
partitions.

Make messages to be visible immediately to consumer rather than waiting for 
high water mark. Make change in consumer & core, based on consumer 
configuration, LEO messages to be visible to consumer like followers. It is the 
risk of consumer to lose messages during fail over of leader for which we are 
OK.

  was:
we have a use case(real time customer facing application) to support high 
throughput(reduced latency) & high volume of transactions through Kafka. During 
performance testing, we noted that consumer listening to single partition 
topics get messages faster than the consumers listening to replicated 
partitions. 

Make messages to be visible immediately to consumer rather than waiting for 
high water mark. Make change in consumer & core, based on consumer 
configuration, LEO messages to be visible to consumer like followers. It is the 
risk of consumer to lose messages during fail over of leader for which we are 
OK.



> Allow consumers to read messages from LEO
> -
>
> Key: KAFKA-6217
> URL: https://issues.apache.org/jira/browse/KAFKA-6217
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, core
>Reporter: Ashok Bala
>Priority: Major
>
> we have a use case(real time customer facing application) to support high 
> throughput(reduced latency) & high volume of transactions through Kafka. 
> During performance testing, we noted that consumer listening to single 
> replicated partition topics get messages faster than the consumers listening 
> to replicated partitions.
> Make messages to be visible immediately to consumer rather than waiting for 
> high water mark. Make change in consumer & core, based on consumer 
> configuration, LEO messages to be visible to consumer like followers. It is 
> the risk of consumer to lose messages during fail over of leader for which we 
> are OK.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman opened a new pull request #9534: KAFKA-10664: Delete existing checkpoint when writing empty offsets

2020-10-29 Thread GitBox


ableegoldman opened a new pull request #9534:
URL: https://github.com/apache/kafka/pull/9534


   ...otherwise we can get stuck in an endless loop of initializing corrupted 
offsets, hitting OffsetOutOfRangeException and closing the task, then reviving 
the task with those same corrupted offsets.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers

2020-10-29 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17223324#comment-17223324
 ] 

Matthias J. Sax commented on KAFKA-10635:
-

\cc [~hachikuji] [~bob-barrett]

> Streams application fails with OutOfOrderSequenceException after rolling 
> restarts of brokers
> 
>
> Key: KAFKA-10635
> URL: https://issues.apache.org/jira/browse/KAFKA-10635
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.5.1
>Reporter: Peeraya Maetasatidsuk
>Priority: Blocker
>
> We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a 
> rolling restart of the brokers after installing the new version. After the 
> restarts we notice one of our streams app (client version 2.4.1) fails with 
> OutOfOrderSequenceException:
>  
> {code:java}
> ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected 
> error. Record: a_record, destination topic: 
> topic-name-Aggregation-repartition 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> ERROR [2020-10-13 22:52:21,413] 
> [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread 
> [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the 
> following error: org.apache.kafka.streams.errors.StreamsException: task 
> [1_39] Abort sending since an error caught with a previous record (timestamp 
> 1602654659000) to topic topic-name-Aggregation-repartition due to 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:204)
> at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674)
> at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596)
> at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798)
> at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)   
>  at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
> at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)   
>  at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)   
>  at java.base/java.lang.Thread.run(Thread.java:834)Caused by: 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> {code}
> We see a corresponding error on the broker side:
> {code:java}
> [2020-10-13 22:52:21,398] ERROR [ReplicaManager broker=137636348] Error 
> processing append operation on partition 
> topic-name-Aggregation-repartition-52  
> (kafka.server.ReplicaManager)org.apache.kafka.common.errors.OutOfOrderSequenceException:
>  Out of order sequence number for producerId 2819098 at offset 1156041 in 
> partition topic-name-Aggregation-repartition-52: 29 (incoming seq. number), 
> -1 (current end sequence number)
> {code}
> We are able to reproduce this many times and it happens regardless of whether 
> the broker shutdown (at restart) is clean or unclean. However, when we 
> rollback the broker version to 2.3.1 from 2.5.1 and perform similar rolling 
> restarts, we don't see this error on the streams application at all. This is 
> blocking us from upgrading our broker version. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10664) Streams fails to overwrite corrupted offsets leading to infinite OffsetOutOfRangeException loop

2020-10-29 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-10664:
--

 Summary: Streams fails to overwrite corrupted offsets leading to 
infinite OffsetOutOfRangeException loop
 Key: KAFKA-10664
 URL: https://issues.apache.org/jira/browse/KAFKA-10664
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.7.0
Reporter: A. Sophie Blee-Goldman
Assignee: A. Sophie Blee-Goldman
 Fix For: 2.7.0


In KAFKA-10391 we fixed an issue where Streams could get stuck in an infinite 
loop of  OffsetOutOfRangeException/TaskCorruptedException due to 
re-initializing the corrupted offsets from the checkpoint after each revival. 
The fix we applied was to remove the corrupted offsets from the state manager 
and then force it to write a new checkpoint file without those offsets during 
revival.

Unfortunately we missed that there's an optimization in OffsetCheckpoint#write 
to just return without writing anything when there's no offsets. So if a task 
doesn't have any offsets that _aren't_ corrupted, it will skip overwriting the 
corrupted checkpoint.

Probably we should just fix the optimization in OffsetCheckpoint so that it 
deletes the current checkpoint in the case there are no offsets to write



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10645) Forwarding a record from a punctuator sometimes it results in a NullPointerException

2020-10-29 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10645?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17222393#comment-17222393
 ] 

Matthias J. Sax edited comment on KAFKA-10645 at 10/30/20, 1:31 AM:


[~showuon] While my PR adds an additional check, it is still unclear how a 
`null` header was added in the first place?
{quote}by passing the headers Array/Iterable to the constructor.
{quote}
Yes, but if the input records does not have any headers, the array should have 
zero entries? It's still unclear to me, where a `null` entry comes from?

[~filmac79] I have no idea atm what could cause the 
`ArrayIndexOutOfBoundsException` atm.  In L67 we call `headers.add()` that 
should add the header to the end of the list. It seem that the `ArrayList.add` 
does something funny? There should not be any synchronization issue, as the 
punctuate() and process() are called by the same thread?

Well, there was some other issue with regard to concurrency though: 
https://issues.apache.org/jira/browse/KAFKA-9584 – maybe this is the root cause 
but you just hit a different exception?

 


was (Author: mjsax):
[~showuon] While my PR adds an additional check, it is still unclear how a 
`null` header was added in the first place?
{quote}by passing the headers Array/Iterable to the constructor.
{quote}
Yes, but if the input records does not have any headers, the array should have 
zero entries? It's still unclear to me, there a `null` entry comes from?

[~filmac79] I have no idea atm what could cause the 
`ArrayIndexOutOfBoundsException` atm.  In L67 we call `headers.add()` that 
should add the header to the end of the list. It seem that the `ArrayList.add` 
does something funny? There should not be any synchronization issue, as the 
punctuate() and process() are called by the same thread?

Well, there was some other issue with regard to concurrency though: 
https://issues.apache.org/jira/browse/KAFKA-9584 – maybe this is the root cause 
but you just hit a different exception?

 

> Forwarding a record from a punctuator sometimes it results in a 
> NullPointerException
> 
>
> Key: KAFKA-10645
> URL: https://issues.apache.org/jira/browse/KAFKA-10645
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Filippo Machi
>Assignee: Matthias J. Sax
>Priority: Major
>
> Hello,
>  I am working on a java kafka stream application (v. 2.5.0) running on a 
> kubernetes cluster.
> It´s a springboot application running with java 8.
> With the last upgrade to version 2.5.0 I started to see into the logs some 
> NullPointerException that are happening when forwarding a record from a 
> punctuator. 
>  This is the stacktrace of the exception
> {code:java}
> Caused by: org.apache.kafka.streams.errors.StreamsException: task [2_2] Abort 
> sending since an error caught with a previous record (timestamp 
> 1603721062667) to topic reply-reminder-push-sender due to 
> java.lang.NullPointerException\tat 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:240)\tat
>  
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:111)\tat
>  
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)\tat
>  
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)\t...
>  24 common frames omittedCaused by: java.lang.NullPointerException: null\tat 
> org.apache.kafka.common.record.DefaultRecord.sizeOf(DefaultRecord.java:613)\tat
>  
> org.apache.kafka.common.record.DefaultRecord.recordSizeUpperBound(DefaultRecord.java:633)\tat
>  
> org.apache.kafka.common.record.DefaultRecordBatch.estimateBatchSizeUpperBound(DefaultRecordBatch.java:534)\tat
>  
> org.apache.kafka.common.record.AbstractRecords.estimateSizeInBytesUpperBound(AbstractRecords.java:135)\tat
>  
> org.apache.kafka.common.record.AbstractRecords.estimateSizeInBytesUpperBound(AbstractRecords.java:125)\tat
>  
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:914)\tat
>  
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862)\tat
>  
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:181)\t...
>  29 common frames omitted
> {code}
> Checking the code, it looks like it happens calculating the size of the 
> record. There is one header that is null but I don´t think I can control 
> those headers right?
> Thanks a 

[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514636997



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, 
JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), 
request.version())
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version())
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version())
+  case res: DescribeUserScramCredentialsRequest => 

[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514636692



##
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##
@@ -162,7 +161,7 @@ object RequestChannel extends Logging {
   }
 }
 
-trace(s"Processor $processor received request: ${requestDesc(true)}")
+trace(s"Processor $processor received request: 
${RequestConvertToJson.requestDesc(header, loggableRequest, true).toString}")

Review comment:
   The macro does check `isTraceEnabled`, and this check is done before the 
string is evaluated, so it wouldn't be unnecessarily computed. But I'm also 
fine with gating it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


lbradstreet commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514635784



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, 
JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), 
request.version())
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version())
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version())
+  case res: DescribeUserScramCredentialsRequest => 

[GitHub] [kafka] mjsax merged pull request #9513: MINOR: improve `null` checks for headers

2020-10-29 Thread GitBox


mjsax merged pull request #9513:
URL: https://github.com/apache/kafka/pull/9513


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9513: MINOR: improve `null` checks for headers

2020-10-29 Thread GitBox


mjsax commented on pull request #9513:
URL: https://github.com/apache/kafka/pull/9513#issuecomment-719085653


   > However, it may be overkill since users don't create ConsumerRecord.
   
   Yeah, I tend to agree. Would not hurt to add it, but should not be strictly 
necessary. Will merge this as-is.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9530: MINOR: Fix version verification in system test

2020-10-29 Thread GitBox


ableegoldman commented on pull request #9530:
URL: https://github.com/apache/kafka/pull/9530#issuecomment-719085324


   Cherrypicked to 2.6 & 2.7 cc/ @bbejeck 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9518: KAFKA-10645: Add null check to the array/Iterable values in RecordHeaders constructor

2020-10-29 Thread GitBox


mjsax commented on pull request #9518:
URL: https://github.com/apache/kafka/pull/9518#issuecomment-719084829


   @showuon Seems we worked on this in parallel. Sorry for that...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514621561



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, 
JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), 
request.version())
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version())
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version())
+  case res: DescribeUserScramCredentialsRequest => 

[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514619891



##
File path: core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala
##
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util.HashMap
+
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message._
+import org.junit.Test
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests._
+
+import scala.collection.mutable.ArrayBuffer
+
+class RequestConvertToJsonTest {
+
+  def createRequestsFromApiKey(apiKey: ApiKeys, version: Short): 
AbstractRequest = apiKey match {
+case ApiKeys.PRODUCE => ProduceRequest.Builder.forCurrentMagic(0.toShort, 
1, new HashMap[TopicPartition, MemoryRecords]()).build()
+case ApiKeys.FETCH => new FetchRequest(new FetchRequestData(), version)
+case ApiKeys.LIST_OFFSETS => new ListOffsetRequest(new 
ListOffsetRequestData().toStruct(version), version)
+case ApiKeys.METADATA => new MetadataRequest(new MetadataRequestData(), 
version)
+case ApiKeys.OFFSET_COMMIT => new OffsetCommitRequest(new 
OffsetCommitRequestData(), version)
+case ApiKeys.OFFSET_FETCH => new OffsetFetchRequest(new 
OffsetFetchRequestData().toStruct(version), version)
+case ApiKeys.FIND_COORDINATOR => new FindCoordinatorRequest(new 
FindCoordinatorRequestData().toStruct(version), version)
+case ApiKeys.JOIN_GROUP => new JoinGroupRequest(new 
JoinGroupRequestData(), version)
+case ApiKeys.HEARTBEAT => new HeartbeatRequest(new 
HeartbeatRequestData().toStruct(version), version)
+case ApiKeys.LEAVE_GROUP => new LeaveGroupRequest(new 
LeaveGroupRequestData().toStruct(version), version)

Review comment:
   I guess this is related with the consistency issue, but I tried not to 
make too many changes. The constructors with the `data` parameters were private 
in some cases, so I used the `toStruct`. But for consistency, I'll change to 
use one or the other rather than alternate between the two.

##
File path: core/src/test/scala/unit/kafka/network/RequestConvertToJsonTest.scala
##
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import java.util.HashMap
+
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message._
+import org.junit.Test
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests._
+
+import scala.collection.mutable.ArrayBuffer
+
+class RequestConvertToJsonTest {
+
+  def createRequestsFromApiKey(apiKey: ApiKeys, version: Short): 
AbstractRequest = apiKey match {
+case ApiKeys.PRODUCE => ProduceRequest.Builder.forCurrentMagic(0.toShort, 
1, new HashMap[TopicPartition, MemoryRecords]()).build()
+case ApiKeys.FETCH => new FetchRequest(new FetchRequestData(), version)
+case ApiKeys.LIST_OFFSETS => new ListOffsetRequest(new 
ListOffsetRequestData().toStruct(version), version)
+case ApiKeys.METADATA => new MetadataRequest(new MetadataRequestData(), 
version)
+case ApiKeys.OFFSET_COMMIT => new OffsetCommitRequest(new 
OffsetCommitRequestData(), version)
+case ApiKeys.OFFSET_FETCH => new OffsetFetchRequest(new 

[GitHub] [kafka] ableegoldman merged pull request #9530: MINOR: Fix version verification in system test

2020-10-29 Thread GitBox


ableegoldman merged pull request #9530:
URL: https://github.com/apache/kafka/pull/9530


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9530: MINOR: Fix version verification in system test

2020-10-29 Thread GitBox


ableegoldman commented on pull request #9530:
URL: https://github.com/apache/kafka/pull/9530#issuecomment-719074084


   One flaky test failed: 
`EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]`
   
   Confirmed systems test passed. Merging to trunk



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514617626



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, 
JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), 
request.version())
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version())
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version())
+  case res: DescribeUserScramCredentialsRequest => 

[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514614216



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, 
JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), 
request.version())
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version())
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version())
+  case res: DescribeUserScramCredentialsRequest => 

[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514610499



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, 
JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), 
request.version())
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version())
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version())
+  case res: DescribeUserScramCredentialsRequest => 

[GitHub] [kafka] vvcephei commented on pull request #9467: KAFKA-10515: Properly initialize nullable Serdes with default values

2020-10-29 Thread GitBox


vvcephei commented on pull request #9467:
URL: https://github.com/apache/kafka/pull/9467#issuecomment-719063925


   Merged to 2.6 (cc @mimaison )
   
   Thanks again for the contribution, @thake !



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10515) NPE: Foreign key join serde may not be initialized with default serde if application is distributed

2020-10-29 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10515?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10515:
-
Fix Version/s: 2.6.1

> NPE: Foreign key join serde may not be initialized with default serde if 
> application is distributed
> ---
>
> Key: KAFKA-10515
> URL: https://issues.apache.org/jira/browse/KAFKA-10515
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0, 2.5.1
>Reporter: Thorsten Hake
>Priority: Critical
> Fix For: 2.7.0, 2.6.1
>
>
> The fix of KAFKA-9517 fixed the initialization of the foreign key joins 
> serdes for KStream applications that do not run distributed over multiple 
> instances.
> However, if an application runs distributed over multiple instances, the 
> foreign key join serdes may still not be initialized leading to the following 
> NPE:
> {noformat}
> Encountered the following error during 
> processing:java.lang.NullPointerException: null
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:85)
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:52)
>   at 
> org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:59)
>   at 
> org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:50)
>   at 
> org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:27)
>   at 
> org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:102)
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:55)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670){noformat}
> This happens because the processors for foreign key joins will be distributed 
> across multiple tasks. The serde will only be initialized with the default 
> serde during the initialization of the task containing the sink node 
> ("subscription-registration-sink"). So if the task containing the 
> SubscriptionStoreReceiveProcessor ("subscription-receive") is not assigned to 
> the same instance as the task 

[GitHub] [kafka] vvcephei merged pull request #9467: KAFKA-10515: Properly initialize nullable Serdes with default values

2020-10-29 Thread GitBox


vvcephei merged pull request #9467:
URL: https://github.com/apache/kafka/pull/9467


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#discussion_r514594596



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
##
@@ -210,65 +142,42 @@ public String toString() {
 }
 }
 
+/**
+ * We have to copy acks, timeout, transactionalId and partitionSizes from 
data since data maybe reset to eliminate
+ * the reference to ByteBuffer but those metadata are still useful.
+ */
 private final short acks;
 private final int timeout;
 private final String transactionalId;
-
-private final Map partitionSizes;
-
+// visible for testing
+final Map partitionSizes;
+private boolean hasTransactionalRecords = false;
+private boolean hasIdempotentRecords = false;
 // This is set to null by `clearPartitionRecords` to prevent unnecessary 
memory retention when a produce request is
 // put in the purgatory (due to client throttling, it can take a while 
before the response is sent).
 // Care should be taken in methods that use this field.
-private volatile Map partitionRecords;
-private boolean hasTransactionalRecords = false;
-private boolean hasIdempotentRecords = false;
-
-private ProduceRequest(short version, short acks, int timeout, 
Map partitionRecords, String transactionalId) {
-super(ApiKeys.PRODUCE, version);
-this.acks = acks;
-this.timeout = timeout;
-
-this.transactionalId = transactionalId;
-this.partitionRecords = partitionRecords;
-this.partitionSizes = createPartitionSizes(partitionRecords);
+private volatile ProduceRequestData data;
 
-for (MemoryRecords records : partitionRecords.values()) {
-setFlags(records);
-}
-}
-
-private static Map 
createPartitionSizes(Map partitionRecords) {
-Map result = new 
HashMap<>(partitionRecords.size());
-for (Map.Entry entry : 
partitionRecords.entrySet())
-result.put(entry.getKey(), entry.getValue().sizeInBytes());
-return result;
-}
-
-public ProduceRequest(Struct struct, short version) {
+public ProduceRequest(ProduceRequestData produceRequestData, short 
version) {
 super(ApiKeys.PRODUCE, version);
-partitionRecords = new HashMap<>();
-for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) {
-Struct topicData = (Struct) topicDataObj;
-String topic = topicData.get(TOPIC_NAME);
-for (Object partitionResponseObj : 
topicData.getArray(PARTITION_DATA_KEY_NAME)) {
-Struct partitionResponse = (Struct) partitionResponseObj;
-int partition = partitionResponse.get(PARTITION_ID);
-MemoryRecords records = (MemoryRecords) 
partitionResponse.getRecords(RECORD_SET_KEY_NAME);
-setFlags(records);
-partitionRecords.put(new TopicPartition(topic, partition), 
records);
-}
-}
-partitionSizes = createPartitionSizes(partitionRecords);
-acks = struct.getShort(ACKS_KEY_NAME);
-timeout = struct.getInt(TIMEOUT_KEY_NAME);
-transactionalId = struct.getOrElse(NULLABLE_TRANSACTIONAL_ID, null);
-}
-
-private void setFlags(MemoryRecords records) {
-Iterator iterator = records.batches().iterator();
-MutableRecordBatch entry = iterator.next();
-hasIdempotentRecords = hasIdempotentRecords || entry.hasProducerId();
-hasTransactionalRecords = hasTransactionalRecords || 
entry.isTransactional();
+this.data = produceRequestData;
+this.data.topicData().forEach(topicProduceData -> 
topicProduceData.partitions()
+.forEach(partitionProduceData -> {
+MemoryRecords records = 
MemoryRecords.readableRecords(partitionProduceData.records());
+Iterator iterator = 
records.batches().iterator();
+MutableRecordBatch entry = iterator.next();
+hasIdempotentRecords = hasIdempotentRecords || 
entry.hasProducerId();

Review comment:
   On the other hand, we might want to move this logic into a helper in 
`KafkaApis` so that these objects are dedicated only to serialization logic. 
Eventually we'll want to get rid of `ProduceRequest` and just use 
`ProduceRequestData`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gitlw opened a new pull request #9533: Show log end offset during truncation to help estimate data loss during ULE

2020-10-29 Thread GitBox


gitlw opened a new pull request #9533:
URL: https://github.com/apache/kafka/pull/9533


   During Unclean Leader Election, there could be data loss due to truncation 
at the resigned leader.
   This PR tries to add more logs to understand the scale of message loss 
during an unclean leader election.
   
   Suppose there are 3 brokers that has replicas for a given partition:
   Broker A (leader) with largest offset 9 (log end offset 10)
   Broker B (follower) with largest offset 4 (log end offset 5)
   Broker C (follower) with largest offset 1 (log end offset 2)
   
   Only the leader A is in the ISR with B and C lagging behind.
   Now an unclean leader election causes the leadership to be transferred to C. 
Broker A would need to truncate 8 messages, and Broker B 3 messages.
   
   Case 1: if these messages have been produced with acks=0 or 1, then clients 
would experience 8 lost messages.
   Case 2: if the client is using acks=all and the partition's minISR setting 
is 2, and further let's assume broker B dropped out of the ISR after receiving 
the message with offset 4, then only the messages with offset<=4 have been 
acked to the client. The truncation effectively causes the client to lose 3 
messages.
   
   Knowing the exact amount of data loss involves knowing the client's acks 
setting when the messages are produced, and also whether the messages have been 
sufficiently replicated according to the MinISR setting.
   Without getting too involved, this PR reduces the requirement from getting 
the exact data loss numbers to getting an ESTIMATE of the data loss.
   Specifically this PR adds logs during truncation to show the log end offset, 
number of messages truncated, and number of bytes truncated.
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514585353



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##
@@ -26,30 +24,53 @@
 
 interface Listener {
 /**
- * Callback which is invoked when records written through {@link 
#scheduleAppend(int, List)}
- * become committed.
+ * Callback which is invoked for all records committed to the log.
+ * It is the responsibility of the caller to invoke {@link 
BatchReader#close()}
+ * after consuming the reader.
  *
  * Note that there is not a one-to-one correspondence between writes 
through
  * {@link #scheduleAppend(int, List)} and this callback. The Raft 
implementation
  * is free to batch together the records from multiple append calls 
provided
  * that batch boundaries are respected. This means that each batch 
specified
  * through {@link #scheduleAppend(int, List)} is guaranteed to be a 
subset of
- * a batch passed to {@link #handleCommit(int, long, List)}.
+ * a batch provided by the {@link BatchReader}.
+ *
+ * @param reader reader instance which must be iterated and closed
+ */
+void handleCommit(BatchReader reader);
+
+/**
+ * Invoked after this node has become a leader. This is only called 
after
+ * all commits up to the start of the leader's epoch have been sent to
+ * {@link #handleCommit(BatchReader)}.
+ *
+ * After becoming a leader, the client is eligible to write to the log
+ * using {@link #scheduleAppend(int, List)}.
  *
- * @param epoch the epoch in which the write was accepted
- * @param lastOffset the offset of the last record in the record list
- * @param records the set of records that were committed
+ * @param epoch the claimed leader epoch
  */
-void handleCommit(int epoch, long lastOffset, List records);
+default void handleClaim(int epoch) {}

Review comment:
   Yeah, I considered using `handleBecomeLeader` and `handleResignLeader`. 
In the end, I decided to use the more concise `handleClaim` and `handleResign` 
names which are used in the kip-500 branch. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514585353



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java
##
@@ -26,30 +24,53 @@
 
 interface Listener {
 /**
- * Callback which is invoked when records written through {@link 
#scheduleAppend(int, List)}
- * become committed.
+ * Callback which is invoked for all records committed to the log.
+ * It is the responsibility of the caller to invoke {@link 
BatchReader#close()}
+ * after consuming the reader.
  *
  * Note that there is not a one-to-one correspondence between writes 
through
  * {@link #scheduleAppend(int, List)} and this callback. The Raft 
implementation
  * is free to batch together the records from multiple append calls 
provided
  * that batch boundaries are respected. This means that each batch 
specified
  * through {@link #scheduleAppend(int, List)} is guaranteed to be a 
subset of
- * a batch passed to {@link #handleCommit(int, long, List)}.
+ * a batch provided by the {@link BatchReader}.
+ *
+ * @param reader reader instance which must be iterated and closed
+ */
+void handleCommit(BatchReader reader);
+
+/**
+ * Invoked after this node has become a leader. This is only called 
after
+ * all commits up to the start of the leader's epoch have been sent to
+ * {@link #handleCommit(BatchReader)}.
+ *
+ * After becoming a leader, the client is eligible to write to the log
+ * using {@link #scheduleAppend(int, List)}.
  *
- * @param epoch the epoch in which the write was accepted
- * @param lastOffset the offset of the last record in the record list
- * @param records the set of records that were committed
+ * @param epoch the claimed leader epoch
  */
-void handleCommit(int epoch, long lastOffset, List records);
+default void handleClaim(int epoch) {}

Review comment:
   Yeah, I considered using `handleBecomeLeader` and `handleResignLeader`. 
In the end, I decided to use the more concise `handleClaim` and `handleResign` 
which are used in the kip-500 branch. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514584469



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1688,14 +1694,39 @@ private long pollCurrentState(long currentTimeMs) 
throws IOException {
 }
 }
 
+private void pollListeners() {
+// Register any listeners added since the last poll
+while (!pendingListeners.isEmpty()) {
+Listener listener = pendingListeners.poll();
+listenerContexts.add(new ListenerContext(listener));
+}
+
+// Check listener progress to see if reads are expected
+quorum.highWatermark().ifPresent(highWatermarkMetadata -> {
+long highWatermark = highWatermarkMetadata.offset;
+
+List listenersToUpdate = listenerContexts.stream()
+.filter(listenerContext -> {
+OptionalLong nextExpectedOffset = 
listenerContext.nextExpectedOffset();
+return nextExpectedOffset.isPresent() && 
nextExpectedOffset.getAsLong() < highWatermark;
+})
+.collect(Collectors.toList());
+
+maybeFireHandleCommit(listenersToUpdate, 
highWatermarkMetadata.offset);
+});
+}
+
 public void poll() throws IOException {
 GracefulShutdown gracefulShutdown = shutdown.get();
 if (gracefulShutdown != null) {
 pollShutdown(gracefulShutdown);
 } else {
+pollListeners();

Review comment:
   Hmm, that's a fair question. I think the listeners will tend to get new 
data in two cases: 1) high watermark advanced, or 2) a previous read completes. 
In the first case, the high watermark only advances in response to a request, 
so there should be no delay. In the second case, we call `wakeup()` to take us 
out of the network poll, so I think there also should be no delay. Can you 
think of a case where there would be a delay?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on a change in pull request #9532: MINOR: Move upgraded docs from site to kafak docs

2020-10-29 Thread GitBox


bbejeck commented on a change in pull request #9532:
URL: https://github.com/apache/kafka/pull/9532#discussion_r514580970



##
File path: docs/upgrade.html
##
@@ -21,6 +21,48 @@
 
 Notable changes in 
2.8.0
 
+Upgrading to 2.7.0 from any 
version 0.8.x through 2.6.x

Review comment:
   This whole section is new, but should be standard instructions

##
File path: docs/documentation.html
##
@@ -22,51 +22,56 @@
 
 
 
-   
-   
-   
+ 
+  
+
+  
+  
+
+  
+  
+
+
 Documentation
 Kafka 2.8 Documentation
 Prior releases: 0.7.x, 0.8.0, 0.8.1.X, 0.8.2.X, 0.9.0.X, 0.10.0.X, 0.10.1.X, 0.10.2.X, 0.11.0.X, 1.0.X, 1.1.X, 2.0.X, 2.1.X, 2.2.X, 2.3.X, 2.4.X, 2.5.X, 2.6.X, 2.7.X.
 
-
-
-1. Getting 
Started
-  1.1 Introduction
+   1. Getting Started
+  1.1 Introduction
   
-  1.2 Use Cases
+  1.2 Use Cases
   
-  1.3 Quick Start
-  
-  1.4 Ecosystem
+  1.3 Quick Start
+  
+  1.4 Ecosystem
   
-  1.5 Upgrading From Previous 
Versions
+  1.5 Upgrading From Previous Versions
   
 
-2. APIs
+2. APIs
 
 
 
-3. Configuration
+3. Configuration
 
 
 
-4. Design
+4. Design
 
 
 
-5. 
Implementation
+5. Implementation
 
 
 
-6. Operations
+6. Operations
 
 
 
-7. Security
+7. Security
 
 
-8. Kafka Connect
+8. Kafka Connect

Review comment:
   Current 2.6 docs part of AK site update

##
File path: docs/documentation.html
##
@@ -22,51 +22,56 @@
 
 
 
-   
-   
-   
+ 
+  
+
+  
+  
+
+  
+  
+
+

Review comment:
   Current docs, part of AK site upgrade

##
File path: docs/upgrade.html
##
@@ -93,8 +135,49 @@ Notable changes in 2
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-616%3A+Rename+implicit+Serdes+instances+in+kafka-streams-scala;>KIP-616
 
 
+Upgrading to 2.6.0 from any 
version 0.8.x through 2.5.x
+
+If you are upgrading from a version prior to 2.1.x, please see the note 
below about the change to the schema used to store consumer offsets.
+Once you have changed the inter.broker.protocol.version to the latest 
version, it will not be possible to downgrade to a version prior to 2.1.
+
+For a rolling upgrade:
 
-Notable changes in 
2.6.0
+
+ Update server.properties on all brokers and add the following 
properties. CURRENT_KAFKA_VERSION refers to the version you
+are upgrading from. CURRENT_MESSAGE_FORMAT_VERSION refers to the 
message format version currently in use. If you have previously
+overridden the message format version, you should keep its current 
value. Alternatively, if you are upgrading from a version prior
+to 0.11.0.x, then CURRENT_MESSAGE_FORMAT_VERSION should be set to 
match CURRENT_KAFKA_VERSION.
+
+inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g., 
2.5, 2.4, etc.)
+log.message.format.version=CURRENT_MESSAGE_FORMAT_VERSION  
(See potential performance impact
+following the upgrade for the details on what this 
configuration does.)
+
+If you are upgrading from version 0.11.0.x or above, and you have not 
overridden the message format, then you only need to override
+the inter-broker protocol version.
+
+inter.broker.protocol.version=CURRENT_KAFKA_VERSION (e.g., 
2.5, 2.4, etc.)
+
+
+ Upgrade the brokers one at a time: shut down the broker, update the 
code, and restart it. Once you have done so, the
+brokers will be running the latest version and you can verify that the 
cluster's behavior and performance meets expectations.
+It is still possible to downgrade at this point if there are any 
problems.
+
+ Once the cluster's behavior and performance has been verified, bump 
the protocol version by editing
+inter.broker.protocol.version and setting it to 
2.6.
+
+ Restart the brokers one by one for the new protocol version to take 
effect. Once the brokers begin using the latest
+protocol version, it will no longer be possible to downgrade the 
cluster to an older version.
+
+ If you have overridden the message format version as instructed 
above, then you need to do one more rolling restart to
+upgrade it to its latest version. Once all (or most) consumers have 
been upgraded to 0.11.0 or later,
+change log.message.format.version to 2.6 on each broker and restart 
them one by one. Note that the older Scala clients,
+which are no longer maintained, do not support the message format 
introduced in 0.11, so to avoid conversion costs
+(or to take advantage of exactly once semantics),
+the newer Java clients must be used.
+
+
+
+Notable changes in 
2.6.0

Review comment:
   Current 2.6 docs

##
File path: docs/upgrade.html

[GitHub] [kafka] bbejeck commented on pull request #9532: MINOR: Move upgraded docs from site to kafak docs

2020-10-29 Thread GitBox


bbejeck commented on pull request #9532:
URL: https://github.com/apache/kafka/pull/9532#issuecomment-719035513


   Note that most of this PR already exists in 2.6 and doesn't need a close look



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


lbradstreet commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514577992



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, 
JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), 
request.version())
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version())
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version())
+  case res: DescribeUserScramCredentialsRequest => 

[GitHub] [kafka] lbradstreet commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


lbradstreet commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514577021



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, 
JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), 
request.version())
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version())
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version())
+  case res: DescribeUserScramCredentialsRequest => 

[GitHub] [kafka] lbradstreet commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


lbradstreet commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514576105



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, 
JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), 
request.version())
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version())
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version())
+  case res: DescribeUserScramCredentialsRequest => 

[GitHub] [kafka] lbradstreet commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


lbradstreet commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514575372



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, 
JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), 
request.version())
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version())
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version())
+  case res: DescribeUserScramCredentialsRequest => 

[GitHub] [kafka] lbradstreet commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


lbradstreet commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514574814



##
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##
@@ -162,7 +161,7 @@ object RequestChannel extends Logging {
   }
 }
 
-trace(s"Processor $processor received request: ${requestDesc(true)}")
+trace(s"Processor $processor received request: 
${RequestConvertToJson.requestDesc(header, loggableRequest, true).toString}")

Review comment:
   Would we unnecessarily calculate it here? I think the macro will avoid 
it in this case, but I'm fine with gating it anyway.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331 add a streams handler

2020-10-29 Thread GitBox


wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r514566700



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -346,26 +351,89 @@ public void setStateListener(final 
KafkaStreams.StateListener listener) {
  * Set the handler invoked when a {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly
  * terminates due to an uncaught exception.
  *
- * @param eh the uncaught exception handler for all internal threads; 
{@code null} deletes the current handler
+ * @param uncaughtExceptionHandler the uncaught exception handler for all 
internal threads; {@code null} deletes the current handler
  * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ *
+ * @Deprecated Since 2.7.0. Use {@link 
KafkaStreams#setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler)} 
instead.
+ *
  */
-public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh) {
+public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
 synchronized (stateLock) {
 if (state == State.CREATED) {
 for (final StreamThread thread : threads) {
-thread.setUncaughtExceptionHandler(eh);
+
thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
 }
 
 if (globalStreamThread != null) {
-globalStreamThread.setUncaughtExceptionHandler(eh);
+
globalStreamThread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
 }
 } else {
 throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state. " +
-"Current state is: " + state);
+"Current state is: " + state);
 }
 }
 }
 
+/**
+ * Set the handler invoked when a {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread}
+ * throws an unexpected exception.
+ * These might be exceptions indicating rare bugs in Kafka Streams, or they
+ * might be exceptions thrown by your code, for example a 
NullPointerException thrown from your processor
+ * logic.
+ * 
+ * Note, this handler must be threadsafe, since it will be shared among 
all threads, and invoked from any
+ * thread that encounters such an exception.
+ *
+ * @param streamsUncaughtExceptionHandler the uncaught exception handler 
of type {@link StreamsUncaughtExceptionHandler} for all internal threads; 
{@code null} deletes the current handler
+ * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ * @throws NullPointerException @NotNull if 
streamsUncaughtExceptionHandler is null.
+ */
+public void setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler) {
+final StreamsUncaughtExceptionHandler handler = exception -> 
handleStreamsUncaughtException(exception, streamsUncaughtExceptionHandler);
+synchronized (stateLock) {
+if (state == State.CREATED) {
+Objects.requireNonNull(streamsUncaughtExceptionHandler);
+for (final StreamThread thread : threads) {
+thread.setStreamsUncaughtExceptionHandler(handler);
+}
+if (globalStreamThread != null) {
+globalStreamThread.setUncaughtExceptionHandler(handler);
+}
+} else {
+throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state. " +
+"Current state is: " + state);
+}
+}
+}
+
+private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
handleStreamsUncaughtException(final Throwable e,
+   
  final StreamsUncaughtExceptionHandler 
streamsUncaughtExceptionHandler) {
+final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
action = streamsUncaughtExceptionHandler.handle(e);
+switch (action) {
+//case REPLACE_STREAM_THREAD:

Review comment:
   It will. I don't know if we should merge as comment or just add it later





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331 add a streams handler

2020-10-29 Thread GitBox


wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r514567028



##
File path: 
streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
##
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+public interface StreamsUncaughtExceptionHandler {
+/**
+ * Inspect the exception received in a stream thread and respond with an 
action.
+ * @param exception the actual exception
+ */
+StreamThreadExceptionResponse handle(final Throwable exception);
+
+/**
+ * Enumeration that describes the response from the exception handler.
+ */
+enum StreamThreadExceptionResponse {
+//REPLACE_THREAD(0, "REPLACE_THREAD"),

Review comment:
   Same as the other use in KS





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9487: KAFKA-9331 add a streams handler

2020-10-29 Thread GitBox


lct45 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r514535737



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -282,6 +284,15 @@ public boolean isRunning() {
 private final Admin adminClient;
 private final InternalTopologyBuilder builder;
 
+

Review comment:
   two new lines in a row

##
File path: 
streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
##
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+public interface StreamsUncaughtExceptionHandler {
+/**
+ * Inspect the exception received in a stream thread and respond with an 
action.
+ * @param exception the actual exception
+ */
+StreamThreadExceptionResponse handle(final Throwable exception);
+
+/**
+ * Enumeration that describes the response from the exception handler.
+ */
+enum StreamThreadExceptionResponse {
+//REPLACE_THREAD(0, "REPLACE_THREAD"),

Review comment:
   Supposed to be here?

##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -616,13 +623,22 @@ public void 
shouldNotSetGlobalRestoreListenerAfterStarting() {
 public void 
shouldThrowExceptionSettingUncaughtExceptionHandlerNotInCreateState() {
 final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
 streams.start();
-try {
-streams.setUncaughtExceptionHandler(null);
-fail("Should throw IllegalStateException");
-} catch (final IllegalStateException e) {
-// expected
-}
+assertThrows(IllegalStateException.class, () -> 
streams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler) null));
+}
+
+@Test
+public void 
shouldThrowExceptionSettingStreamsUncaughtExceptionHandlerNotInCreateState() {
+final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
+streams.start();
+assertThrows(IllegalStateException.class, () -> 
streams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler) null));
+
 }
+@Test
+public void 
shouldThrowNullPointerExceptionSettingStreamsUncaughtExceptionHandlerIfNull() {
+final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);
+assertThrows(NullPointerException.class, () -> 
streams.setUncaughtExceptionHandler((StreamsUncaughtExceptionHandler) null));
+}
+

Review comment:
   extra line

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -346,26 +351,89 @@ public void setStateListener(final 
KafkaStreams.StateListener listener) {
  * Set the handler invoked when a {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly
  * terminates due to an uncaught exception.
  *
- * @param eh the uncaught exception handler for all internal threads; 
{@code null} deletes the current handler
+ * @param uncaughtExceptionHandler the uncaught exception handler for all 
internal threads; {@code null} deletes the current handler
  * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ *
+ * @Deprecated Since 2.7.0. Use {@link 
KafkaStreams#setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler)} 
instead.
+ *
  */
-public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh) {
+public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
 synchronized (stateLock) {
 if (state == State.CREATED) {
 for (final StreamThread thread : threads) {
-thread.setUncaughtExceptionHandler(eh);
+
thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
 }
 
 if (globalStreamThread != null) {
-

[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514539737



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -329,8 +387,9 @@ private void appendLeaderChangeMessage(LeaderState state, 
long currentTimeMs) {
 }
 
 private void flushLeaderLog(LeaderState state, long currentTimeMs) {
-log.flush();
+// We update the end offset before flushing so that parked fetches can 
return sooner

Review comment:
   Yeah, it's ok for followers to see uncommitted or even unflushed data. 
The main thing is that we avoid advancing the high watermark until the fsync 
completes. Note that this is the main reason that we had to do KAFKA-10527. 
Without this fix, it was possible for the leader to continue in the same epoch 
after a start, which means that it could lose and overwrite unflushed data.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514537694



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -228,35 +234,81 @@ private void updateLeaderEndOffsetAndTimestamp(
 final LogOffsetMetadata endOffsetMetadata = log.endOffset();
 
 if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) {
-updateHighWatermark(state, currentTimeMs);
+onUpdateLeaderHighWatermark(state, currentTimeMs);
 }
 
-LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, 
Isolation.UNCOMMITTED);
-fetchPurgatory.maybeComplete(endOffset, currentTimeMs);
+fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs);
 }
 
-private void updateHighWatermark(
-EpochState state,
+private void onUpdateLeaderHighWatermark(
+LeaderState state,
 long currentTimeMs
 ) {
 state.highWatermark().ifPresent(highWatermark -> {
-logger.debug("High watermark updated to {}", highWatermark);
+logger.debug("Leader high watermark updated to {}", highWatermark);
 log.updateHighWatermark(highWatermark);
-
-LogOffset offset = new LogOffset(highWatermark.offset, 
Isolation.COMMITTED);
-appendPurgatory.maybeComplete(offset, currentTimeMs);
-fetchPurgatory.maybeComplete(offset, currentTimeMs);
+appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs);
+maybeFireHandleCommit(highWatermark.offset);
 });
 }
 
-@Override
-public LeaderAndEpoch currentLeaderAndEpoch() {
-return quorum.leaderAndEpoch();
+private void maybeFireHandleCommit(long highWatermark) {
+maybeFireHandleCommit(listenerContexts, highWatermark);
+}
+
+private void maybeFireHandleCommit(List listenerContexts, 
long highWatermark) {
+// TODO: When there are multiple listeners, we can cache reads to save 
some work
+for (ListenerContext listenerContext : listenerContexts) {
+OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+if (!nextExpectedOffsetOpt.isPresent()) {
+continue;
+}
+
+long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong();
+if (nextExpectedOffset < highWatermark) {
+LogFetchInfo readInfo = log.read(nextExpectedOffset, 
Isolation.COMMITTED);
+listenerContext.fireHandleCommit(nextExpectedOffset, 
readInfo.records);
+}
+}
+}
+
+private void maybeFireHandleCommit(long baseOffset, int epoch, List 
records) {

Review comment:
   The only difference is the input. I will add some comments to try and 
clarify the usage.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514536828



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -228,35 +234,81 @@ private void updateLeaderEndOffsetAndTimestamp(
 final LogOffsetMetadata endOffsetMetadata = log.endOffset();
 
 if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) {
-updateHighWatermark(state, currentTimeMs);
+onUpdateLeaderHighWatermark(state, currentTimeMs);
 }
 
-LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, 
Isolation.UNCOMMITTED);
-fetchPurgatory.maybeComplete(endOffset, currentTimeMs);
+fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs);
 }
 
-private void updateHighWatermark(
-EpochState state,
+private void onUpdateLeaderHighWatermark(
+LeaderState state,
 long currentTimeMs
 ) {
 state.highWatermark().ifPresent(highWatermark -> {
-logger.debug("High watermark updated to {}", highWatermark);
+logger.debug("Leader high watermark updated to {}", highWatermark);
 log.updateHighWatermark(highWatermark);
-
-LogOffset offset = new LogOffset(highWatermark.offset, 
Isolation.COMMITTED);
-appendPurgatory.maybeComplete(offset, currentTimeMs);
-fetchPurgatory.maybeComplete(offset, currentTimeMs);
+appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs);
+maybeFireHandleCommit(highWatermark.offset);
 });
 }
 
-@Override
-public LeaderAndEpoch currentLeaderAndEpoch() {
-return quorum.leaderAndEpoch();
+private void maybeFireHandleCommit(long highWatermark) {
+maybeFireHandleCommit(listenerContexts, highWatermark);
+}
+
+private void maybeFireHandleCommit(List listenerContexts, 
long highWatermark) {
+// TODO: When there are multiple listeners, we can cache reads to save 
some work
+for (ListenerContext listenerContext : listenerContexts) {
+OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+if (!nextExpectedOffsetOpt.isPresent()) {
+continue;
+}
+
+long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong();
+if (nextExpectedOffset < highWatermark) {
+LogFetchInfo readInfo = log.read(nextExpectedOffset, 
Isolation.COMMITTED);
+listenerContext.fireHandleCommit(nextExpectedOffset, 
readInfo.records);
+}
+}
+}
+
+private void maybeFireHandleCommit(long baseOffset, int epoch, List 
records) {
+for (ListenerContext listenerContext : listenerContexts) {
+OptionalLong nextExpectedOffsetOpt = 
listenerContext.nextExpectedOffset();
+if (!nextExpectedOffsetOpt.isPresent()) {
+continue;
+}
+
+long nextExpectedOffset = nextExpectedOffsetOpt.getAsLong();
+if (nextExpectedOffset == baseOffset) {
+listenerContext.fireHandleCommit(baseOffset, epoch, records);
+}
+}
+}
+
+private void maybeFireHandleClaim(LeaderState state) {
+for (ListenerContext listenerContext : listenerContexts) {
+int leaderEpoch = state.epoch();

Review comment:
   I will move this outside the loop.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514535761



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/ThresholdPurgatory.java
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import org.apache.kafka.raft.ExpirationService;
+
+import java.util.NavigableMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ThresholdPurgatory> implements 
FuturePurgatory {
+private final AtomicLong idGenerator = new AtomicLong(0);
+private final ExpirationService expirationService;
+private final ConcurrentNavigableMap, 
CompletableFuture> thresholdMap =
+new ConcurrentSkipListMap<>();
+
+public ThresholdPurgatory(ExpirationService expirationService) {
+this.expirationService = expirationService;
+}
+
+@Override
+public CompletableFuture await(T threshold, long maxWaitTimeMs) {
+ThresholdKey key = new 
ThresholdKey<>(idGenerator.incrementAndGet(), threshold);
+CompletableFuture future = 
expirationService.await(maxWaitTimeMs);
+thresholdMap.put(key, future);
+future.whenComplete((timeMs, exception) -> thresholdMap.remove(key));
+return future;
+}
+
+@Override
+public void maybeComplete(T value, long currentTimeMs) {
+ThresholdKey maxKey = new ThresholdKey<>(Long.MAX_VALUE, value);
+NavigableMap, CompletableFuture> submap = 
thresholdMap.headMap(maxKey);
+for (CompletableFuture completion : submap.values()) {
+completion.complete(currentTimeMs);
+}
+}
+
+@Override
+public void completeAll(long currentTimeMs) {
+for (CompletableFuture completion : thresholdMap.values()) {
+completion.complete(currentTimeMs);
+}
+}
+
+@Override
+public void completeAllExceptionally(Throwable exception) {
+for (CompletableFuture completion : thresholdMap.values()) {
+completion.completeExceptionally(exception);
+}
+}
+
+@Override
+public int numWaiting() {
+return thresholdMap.size();
+}
+
+private static class ThresholdKey> implements 
Comparable> {

Review comment:
   Not at the moment. I guess your point is that we might be able to drop 
the generics, which is fair. I think we can also drop the `FuturePurgatory` 
interface. Is it ok if we save this for a follow-up?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514533712



##
File path: 
raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java
##
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.internals;
+
+import org.apache.kafka.common.protocol.DataInputStreamReadable;
+import org.apache.kafka.common.protocol.Readable;
+import org.apache.kafka.common.record.BufferSupplier;
+import org.apache.kafka.common.record.DefaultRecordBatch;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MutableRecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.RecordSerde;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.OptionalLong;
+
+public class RecordsBatchReader implements BatchReader {

Review comment:
   `MemoryBatchReader` is only used for writes from the leader. We retain 
the original records from the call to `scheduleAppend` and send them to the 
listener in `handleCommit`. This is useful because it ensures the active 
controller will not need to read from disk.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9487: KAFKA-9331 add a streams handler

2020-10-29 Thread GitBox


lct45 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r514527605



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -346,26 +351,89 @@ public void setStateListener(final 
KafkaStreams.StateListener listener) {
  * Set the handler invoked when a {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG internal thread} abruptly
  * terminates due to an uncaught exception.
  *
- * @param eh the uncaught exception handler for all internal threads; 
{@code null} deletes the current handler
+ * @param uncaughtExceptionHandler the uncaught exception handler for all 
internal threads; {@code null} deletes the current handler
  * @throws IllegalStateException if this {@code KafkaStreams} instance is 
not in state {@link State#CREATED CREATED}.
+ *
+ * @Deprecated Since 2.7.0. Use {@link 
KafkaStreams#setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler)} 
instead.
+ *
  */
-public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler eh) {
+public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
 synchronized (stateLock) {
 if (state == State.CREATED) {
 for (final StreamThread thread : threads) {
-thread.setUncaughtExceptionHandler(eh);
+
thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
 }
 
 if (globalStreamThread != null) {
-globalStreamThread.setUncaughtExceptionHandler(eh);
+
globalStreamThread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
 }
 } else {
 throw new IllegalStateException("Can only set 
UncaughtExceptionHandler in CREATED state. " +
-"Current state is: " + state);
+"Current state is: " + state);

Review comment:
   Is this spacing on purpose?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck opened a new pull request #9532: MINOR: Move upgraded docs from site to kafak docs

2020-10-29 Thread GitBox


bbejeck opened a new pull request #9532:
URL: https://github.com/apache/kafka/pull/9532


   For the 2.7 release, we need to migrate some docs changes that went to 
`kafka-site` but didn't go into `kafka/docs`
   
   This PR covers the `documentation.html` and `uprade.html` changes.  Once 
these are merged to trunk, I'll cherry-pick them to the 2.7 branch
   
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization

2020-10-29 Thread GitBox


hachikuji opened a new pull request #9531:
URL: https://github.com/apache/kafka/pull/9531


   When initializing the raft state machine after shutting down as a leader, we 
were previously entering the "unattached" state, which means we have no leader 
and no voted candidate. This was a bug because it allowed a reinitialized 
leader to cast a vote for a candidate in the same epoch that it was already the 
leader of. This patch fixes the problem by introducing a new "resigned" state 
which allows us to retain the leader state so that we cannot change our vote 
and we will not accept additional appends.
   
   This patch also revamps the shutdown logic to make use of the new "resigned" 
state. Previously we had a separate path in `KafkaRaftClient.poll` for the 
shutdown logic which resulted in some duplication. Instead now we incorporate 
shutdown behavior into each state's respective logic.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda merged pull request #9529: Revert initial principal from 2.7

2020-10-29 Thread GitBox


abbccdda merged pull request #9529:
URL: https://github.com/apache/kafka/pull/9529


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514518360



##
File path: raft/src/main/java/org/apache/kafka/raft/BatchReader.java
##
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This interface is used to send committed data from the {@link RaftClient}
+ * down to registered {@link RaftClient.Listener} instances.
+ *
+ * The advantage of hiding the consumption of committed batches behind an 
interface
+ * is that it allows us to push blocking operations such as reads from disk 
outside
+ * of the Raft IO thread. This helps to ensure that a slow state machine will 
not
+ * affect replication.
+ *
+ * @param  record type (see {@link org.apache.kafka.raft.RecordSerde})
+ */
+public interface BatchReader extends Iterator>, 
Closeable {
+
+/**
+ * Get the base offset of the readable batches. Note that this value is a 
constant
+ * which is defined when the {@link BatchReader} instance is constructed. 
It does
+ * not change based on reader progress.
+ *
+ * @return the base offset
+ */
+long baseOffset();
+
+/**
+ * Get the last offset of the batch if it is known. When reading from 
disk, we may
+ * not know the last offset of a set of records until it has been read 
from disk.
+ * In this case, the state machine cannot advance to the next committed 
data until
+ * all batches from the {@link BatchReader} instance have been consumed.
+ *
+ * @return optional last offset
+ */
+OptionalLong lastOffset();
+
+/**
+ * Close this reader. It is the responsibility of the {@link 
RaftClient.Listener}
+ * to close each reader passed to {@link 
RaftClient.Listener#handleCommit(BatchReader)}.
+ */
+@Override
+void close();
+
+class Batch {
+private final long baseOffset;
+private final int epoch;
+private final List records;
+
+public Batch(long baseOffset, int epoch, List records) {
+this.baseOffset = baseOffset;
+this.epoch = epoch;
+this.records = records;
+}
+
+public long lastOffset() {
+return baseOffset + records.size() - 1;
+}
+
+public long baseOffset() {
+return baseOffset;
+}
+
+public List records() {
+return records;
+}
+
+public int epoch() {
+return epoch;
+}
+
+@Override
+public String toString() {
+return "Batch(" +
+"baseOffset=" + baseOffset +
+", epoch=" + epoch +
+", records=" + records +
+')';
+}
+
+@Override
+public boolean equals(Object o) {
+if (this == o) return true;
+if (o == null || getClass() != o.getClass()) return false;
+Batch batch = (Batch) o;
+return baseOffset == batch.baseOffset &&
+epoch == batch.epoch &&
+Objects.equals(records, batch.records);

Review comment:
   We are relying on the standard `equals` method. I think it's up to the 
user of the api to ensure a reasonable implementation if they expect to rely on 
batch equality. The raft implementation does not set any expectations on record 
equality, but it is useful in testing where we can control the record type.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10663) Flakey test ConsumerBounceTest#testSeekAndCommitWithBrokerFailures

2020-10-29 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10663:
---

 Summary: Flakey test 
ConsumerBounceTest#testSeekAndCommitWithBrokerFailures
 Key: KAFKA-10663
 URL: https://issues.apache.org/jira/browse/KAFKA-10663
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 2.7.0
Reporter: Boyang Chen


org.apache.kafka.common.KafkaException: Socket server failed to bind to 
localhost:40823: Address already in use. at 
kafka.network.Acceptor.openServerSocket(SocketServer.scala:671) at 
kafka.network.Acceptor.(SocketServer.scala:539) at 
kafka.network.SocketServer.createAcceptor(SocketServer.scala:280) at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:253)
 at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:251)
 at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:251)
 at kafka.network.SocketServer.startup(SocketServer.scala:125) at 
kafka.server.KafkaServer.startup(KafkaServer.scala:303) at 
kafka.utils.TestUtils$.createServer(TestUtils.scala:160) at 
kafka.utils.TestUtils$.createServer(TestUtils.scala:151) at 
kafka.integration.KafkaServerTestHarness.$anonfun$setUp$1(KafkaServerTestHarness.scala:102)
 at scala.collection.Iterator.foreach(Iterator.scala:943) at 
scala.collection.Iterator.foreach$(Iterator.scal

 

>From AK 2.7



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514516570



##
File path: raft/src/main/java/org/apache/kafka/raft/BatchReader.java
##
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalLong;
+
+/**
+ * This interface is used to send committed data from the {@link RaftClient}
+ * down to registered {@link RaftClient.Listener} instances.
+ *
+ * The advantage of hiding the consumption of committed batches behind an 
interface
+ * is that it allows us to push blocking operations such as reads from disk 
outside
+ * of the Raft IO thread. This helps to ensure that a slow state machine will 
not
+ * affect replication.
+ *
+ * @param  record type (see {@link org.apache.kafka.raft.RecordSerde})
+ */
+public interface BatchReader extends Iterator>, 
Closeable {
+
+/**
+ * Get the base offset of the readable batches. Note that this value is a 
constant
+ * which is defined when the {@link BatchReader} instance is constructed. 
It does
+ * not change based on reader progress.
+ *
+ * @return the base offset
+ */
+long baseOffset();
+
+/**
+ * Get the last offset of the batch if it is known. When reading from 
disk, we may
+ * not know the last offset of a set of records until it has been read 
from disk.
+ * In this case, the state machine cannot advance to the next committed 
data until
+ * all batches from the {@link BatchReader} instance have been consumed.
+ *
+ * @return optional last offset
+ */
+OptionalLong lastOffset();
+
+/**
+ * Close this reader. It is the responsibility of the {@link 
RaftClient.Listener}
+ * to close each reader passed to {@link 
RaftClient.Listener#handleCommit(BatchReader)}.
+ */
+@Override
+void close();
+
+class Batch {
+private final long baseOffset;
+private final int epoch;
+private final List records;
+
+public Batch(long baseOffset, int epoch, List records) {
+this.baseOffset = baseOffset;
+this.epoch = epoch;
+this.records = records;
+}
+
+public long lastOffset() {
+return baseOffset + records.size() - 1;
+}
+
+public long baseOffset() {
+return baseOffset;
+}
+
+public List records() {
+return records;
+}
+
+public int epoch() {
+return epoch;
+}
+
+@Override
+public String toString() {
+return "Batch(" +
+"baseOffset=" + baseOffset +
+", epoch=" + epoch +
+", records=" + records +

Review comment:
   Yeah, I'm relying on the `toString`. I think this is only useful for 
debugging.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514515763



##
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java
##
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class DataInputStreamReadable implements Readable, Closeable {
+protected final DataInputStream input;
+
+public DataInputStreamReadable(DataInputStream input) {
+this.input = input;
+}
+
+@Override
+public byte readByte() {
+try {
+return input.readByte();
+} catch (IOException e) {

Review comment:
   `IOException` is checked, so we cannot raise it from the current 
`Readable` interface, so the options are to either add the exception to the 
`Readable` interface or to rethrow it as an unchecked exception. I went with 
the latter to reduce the impact and because I think we tend to prefer unchecked 
exceptions in general since checked exceptions sort of end up leaking their way 
through a bunch of call stacks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514514051



##
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java
##
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class DataInputStreamReadable implements Readable, Closeable {
+protected final DataInputStream input;
+
+public DataInputStreamReadable(DataInputStream input) {
+this.input = input;
+}
+
+@Override
+public byte readByte() {
+try {
+return input.readByte();
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+}
+
+@Override
+public short readShort() {
+try {

Review comment:
   Yeah, I could introduce a helper with a lambda, but that would add some 
unnecessary garbage to the deserialization path. Although it is ugly, I think 
the duplication is not a big deal. We probably won't touch this class after it 
is created.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514512213



##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
##
@@ -113,11 +113,11 @@ protected HistogramSample newSample(long timeMs) {
 protected void update(Sample sample, MetricConfig config, double value, 
long timeMs) {
 final double boundedValue;
 if (value > max) {
-log.warn("Received value {} which is greater than max recordable 
value {}, will be pinned to the max value",
+log.debug("Received value {} which is greater than max recordable 
value {}, will be pinned to the max value",

Review comment:
   The `warn` seemed excessive for a metric update, which could be done 
very frequently. I looked over the code and it looks like we don't really use 
this outside of tests (and now the code added in this patch). I think the user 
should just understand the contract, which is that anything outside of the 
specified range gets rounded down.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-10-29 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r514508755



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java
##
@@ -57,6 +57,10 @@ public AddOffsetsToTxnRequest(Struct struct, short version) {
 this.data = new AddOffsetsToTxnRequestData(struct, version);
 }
 
+public AddOffsetsToTxnRequestData data() {
+return data;
+}

Review comment:
   Yea I definitely agree with wanting consistency. I'll stick to 1) in the 
PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #9530: MINOR: Fix version verification in system test

2020-10-29 Thread GitBox


cadonna commented on pull request #9530:
URL: https://github.com/apache/kafka/pull/9530#issuecomment-718965450


   System tests run: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4276/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514504593



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1688,14 +1694,39 @@ private long pollCurrentState(long currentTimeMs) 
throws IOException {
 }
 }
 
+private void pollListeners() {
+// Register any listeners added since the last poll
+while (!pendingListeners.isEmpty()) {

Review comment:
   I doubt we would use it in practice, though I guess it would open the 
door to changing roles dynamically, which might be interesting in the future. 
That said, it was simple to add and useful in testing since it gave me an easy 
way to initialize a state where a listener had not caught up.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #9530: MINOR: Fix version verification in system test

2020-10-29 Thread GitBox


cadonna commented on pull request #9530:
URL: https://github.com/apache/kafka/pull/9530#issuecomment-718961222


   This fix needs to be cherry-picked to 2.7 and 2.6 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #9530: MINOR: Fix version verification in system test

2020-10-29 Thread GitBox


cadonna commented on pull request #9530:
URL: https://github.com/apache/kafka/pull/9530#issuecomment-718960604


   Call for review: @lct45 @ableegoldman @vvcephei 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna opened a new pull request #9530: MINOR: Fix version verification in system test

2020-10-29 Thread GitBox


cadonna opened a new pull request #9530:
URL: https://github.com/apache/kafka/pull/9530


   The system test StreamsUpgradeTest.test_version_probing_upgrade
   tries to verify the wrong version for version probing.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10662) Possible hanging test in 2.6 on JDK 11

2020-10-29 Thread John Roesler (Jira)
John Roesler created KAFKA-10662:


 Summary: Possible hanging test in 2.6 on JDK 11
 Key: KAFKA-10662
 URL: https://issues.apache.org/jira/browse/KAFKA-10662
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.6.1
Reporter: John Roesler
 Attachments: timeout-1.txt, timeout-2.txt, timeout-4.txt

While adding a Jenkinsfile to the 2.6 branch 
([https://github.com/apache/kafka/pull/9471),]

I observed the JDK 11 build specifically to hang, 3/5 times (and pass within a 
normal timeframe of 2.5 hours the other two times).

I haven't seen similar behavior on any other branch, so there may be something 
about the 2.6 codebase or the 2.6 tests themselves that interact poorly with 
Java 11.

 

I did some analysis on the failing results, and found that in all three hanging 
cases, all the tests that "STARTED" either "PASSED" or were "SKIPPED". So, I 
was not able to identify a specific culprit. I've attached the logs for these 
runs, in case they aid any investigation.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] senthilm-ms commented on pull request #9495: KAFKA-10642: Expose the real stack trace if any exception occurred during SSL Client Trust Verification in extension

2020-10-29 Thread GitBox


senthilm-ms commented on pull request #9495:
URL: https://github.com/apache/kafka/pull/9495#issuecomment-718944686


   > @rajinisivaram : What do you think? Thanks.
   
   @junrao @rajinisivaram - take a look at the stack trace mentioned in the 
JIRA bug where you don't see the extension/custom implementation stack trace



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


abbccdda commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r514482551



##
File path: core/src/main/scala/kafka/raft/TimingWheelExpirationService.scala
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.raft
+
+import java.util.concurrent.CompletableFuture
+
+import kafka.raft.TimingWheelExpirationService.TimerTaskCompletableFuture
+import kafka.utils.ShutdownableThread
+import kafka.utils.timer.{Timer, TimerTask}
+import org.apache.kafka.common.errors.TimeoutException
+import org.apache.kafka.raft.ExpirationService
+
+object TimingWheelExpirationService {
+  class TimerTaskCompletableFuture[T](override val delayMs: Long) extends 
CompletableFuture[T] with TimerTask {
+override def run(): Unit = {
+  completeExceptionally(new TimeoutException(
+s"Future failed to be completed before timeout of $delayMs ms was 
reached"))
+}
+  }
+}
+
+class TimingWheelExpirationService(timer: Timer) extends ExpirationService {
+  private val expirationReaper = new ExpiredOperationReaper()
+
+  expirationReaper.start()
+
+  override def await[T](timeoutMs: Long): CompletableFuture[T] = {
+val future = new TimerTaskCompletableFuture[T](timeoutMs)
+future.whenComplete { (_, _) =>
+  future.cancel()
+}
+timer.add(future)
+future
+  }
+
+  private class ExpiredOperationReaper extends ShutdownableThread(
+name = "raft-expiration-reaper", isInterruptible = false) {
+
+override def doWork(): Unit = {
+  timer.advanceClock(200L)

Review comment:
   Could we make 200L a constant?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #9495: KAFKA-10642: Expose the real stack trace if any exception occurred during SSL Client Trust Verification in extension

2020-10-29 Thread GitBox


junrao commented on pull request #9495:
URL: https://github.com/apache/kafka/pull/9495#issuecomment-718942968


   @rajinisivaram : What do you think? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-10-29 Thread GitBox


abbccdda commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r513200010



##
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/DataInputStreamReadable.java
##
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.Closeable;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class DataInputStreamReadable implements Readable, Closeable {
+protected final DataInputStream input;
+
+public DataInputStreamReadable(DataInputStream input) {
+this.input = input;
+}
+
+@Override
+public byte readByte() {
+try {
+return input.readByte();
+} catch (IOException e) {
+throw new RuntimeException(e);
+}
+}
+
+@Override
+public short readShort() {
+try {

Review comment:
   nit: we could refactor out the try-catch logic.

##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java
##
@@ -113,11 +113,11 @@ protected HistogramSample newSample(long timeMs) {
 protected void update(Sample sample, MetricConfig config, double value, 
long timeMs) {
 final double boundedValue;
 if (value > max) {
-log.warn("Received value {} which is greater than max recordable 
value {}, will be pinned to the max value",
+log.debug("Received value {} which is greater than max recordable 
value {}, will be pinned to the max value",

Review comment:
   Why do we change these values to debug?

##
File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java
##
@@ -369,6 +379,17 @@ public void transitionToLeader(long epochStartOffset) 
throws IOException {
 if (!candidateState.isVoteGranted())
 throw new IllegalStateException("Cannot become leader without 
majority votes granted");
 
+// Note that the leader does not retain the high watermark that was 
known
+// in the previous state. The reason it does not is to protect the 
monotonicity

Review comment:
   `it does not` could be removed.

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -228,35 +234,81 @@ private void updateLeaderEndOffsetAndTimestamp(
 final LogOffsetMetadata endOffsetMetadata = log.endOffset();
 
 if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) {
-updateHighWatermark(state, currentTimeMs);
+onUpdateLeaderHighWatermark(state, currentTimeMs);
 }
 
-LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, 
Isolation.UNCOMMITTED);
-fetchPurgatory.maybeComplete(endOffset, currentTimeMs);
+fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs);
 }
 
-private void updateHighWatermark(
-EpochState state,
+private void onUpdateLeaderHighWatermark(
+LeaderState state,
 long currentTimeMs
 ) {
 state.highWatermark().ifPresent(highWatermark -> {
-logger.debug("High watermark updated to {}", highWatermark);
+logger.debug("Leader high watermark updated to {}", highWatermark);
 log.updateHighWatermark(highWatermark);
-
-LogOffset offset = new LogOffset(highWatermark.offset, 
Isolation.COMMITTED);
-appendPurgatory.maybeComplete(offset, currentTimeMs);
-fetchPurgatory.maybeComplete(offset, currentTimeMs);
+appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs);
+maybeFireHandleCommit(highWatermark.offset);
 });
 }
 
-@Override
-public LeaderAndEpoch currentLeaderAndEpoch() {
-return quorum.leaderAndEpoch();
+private void maybeFireHandleCommit(long highWatermark) {
+maybeFireHandleCommit(listenerContexts, highWatermark);
+}
+
+private void maybeFireHandleCommit(List listenerContexts, 
long highWatermark) {
+// TODO: When there are multiple listeners, we can cache reads to 

[GitHub] [kafka] junrao commented on pull request #6915: KAFKA-8334 Executor to retry delayed operations failed to obtain lock

2020-10-29 Thread GitBox


junrao commented on pull request #6915:
URL: https://github.com/apache/kafka/pull/6915#issuecomment-718933493


   @yinchuanwang : The short answer is that the retry logic could cause a 
watcher event not to be triggered. If there is no future event triggering the 
same watcher, the corresponding delayed request may not be completed in time, 
leading to request timing out in the client.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#discussion_r514467652



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
##
@@ -210,65 +142,42 @@ public String toString() {
 }
 }
 
+/**
+ * We have to copy acks, timeout, transactionalId and partitionSizes from 
data since data maybe reset to eliminate
+ * the reference to ByteBuffer but those metadata are still useful.
+ */
 private final short acks;
 private final int timeout;
 private final String transactionalId;
-
-private final Map partitionSizes;
-
+// visible for testing
+final Map partitionSizes;
+private boolean hasTransactionalRecords = false;
+private boolean hasIdempotentRecords = false;
 // This is set to null by `clearPartitionRecords` to prevent unnecessary 
memory retention when a produce request is
 // put in the purgatory (due to client throttling, it can take a while 
before the response is sent).
 // Care should be taken in methods that use this field.
-private volatile Map partitionRecords;
-private boolean hasTransactionalRecords = false;
-private boolean hasIdempotentRecords = false;
-
-private ProduceRequest(short version, short acks, int timeout, 
Map partitionRecords, String transactionalId) {
-super(ApiKeys.PRODUCE, version);
-this.acks = acks;
-this.timeout = timeout;
-
-this.transactionalId = transactionalId;
-this.partitionRecords = partitionRecords;
-this.partitionSizes = createPartitionSizes(partitionRecords);
+private volatile ProduceRequestData data;
 
-for (MemoryRecords records : partitionRecords.values()) {
-setFlags(records);
-}
-}
-
-private static Map 
createPartitionSizes(Map partitionRecords) {
-Map result = new 
HashMap<>(partitionRecords.size());
-for (Map.Entry entry : 
partitionRecords.entrySet())
-result.put(entry.getKey(), entry.getValue().sizeInBytes());
-return result;
-}
-
-public ProduceRequest(Struct struct, short version) {
+public ProduceRequest(ProduceRequestData produceRequestData, short 
version) {
 super(ApiKeys.PRODUCE, version);
-partitionRecords = new HashMap<>();
-for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) {
-Struct topicData = (Struct) topicDataObj;
-String topic = topicData.get(TOPIC_NAME);
-for (Object partitionResponseObj : 
topicData.getArray(PARTITION_DATA_KEY_NAME)) {
-Struct partitionResponse = (Struct) partitionResponseObj;
-int partition = partitionResponse.get(PARTITION_ID);
-MemoryRecords records = (MemoryRecords) 
partitionResponse.getRecords(RECORD_SET_KEY_NAME);
-setFlags(records);
-partitionRecords.put(new TopicPartition(topic, partition), 
records);
-}
-}
-partitionSizes = createPartitionSizes(partitionRecords);
-acks = struct.getShort(ACKS_KEY_NAME);
-timeout = struct.getInt(TIMEOUT_KEY_NAME);
-transactionalId = struct.getOrElse(NULLABLE_TRANSACTIONAL_ID, null);
-}
-
-private void setFlags(MemoryRecords records) {
-Iterator iterator = records.batches().iterator();
-MutableRecordBatch entry = iterator.next();
-hasIdempotentRecords = hasIdempotentRecords || entry.hasProducerId();
-hasTransactionalRecords = hasTransactionalRecords || 
entry.isTransactional();
+this.data = produceRequestData;
+this.data.topicData().forEach(topicProduceData -> 
topicProduceData.partitions()
+.forEach(partitionProduceData -> {
+MemoryRecords records = 
MemoryRecords.readableRecords(partitionProduceData.records());
+Iterator iterator = 
records.batches().iterator();
+MutableRecordBatch entry = iterator.next();
+hasIdempotentRecords = hasIdempotentRecords || 
entry.hasProducerId();

Review comment:
   Nevermind, I guess we have to do it here because the server needs to 
validate the request received from the client.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10661) Add resigned state to raft state machine to preserve leader/epoch information

2020-10-29 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-10661:
---

 Summary: Add resigned state to raft state machine to preserve 
leader/epoch information
 Key: KAFKA-10661
 URL: https://issues.apache.org/jira/browse/KAFKA-10661
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jason Gustafson
Assignee: Jason Gustafson


While working on KAFKA-10655, I realized we have a bug in the existing raft 
state initialization logic when the process shuts down as leader. After 
reinitializing, we retain the current epoch, but we discard the current leader 
status. This means that it is possible for the node to vote for another node in 
the same epoch that it was the leader of.

To fix this problem I think we should add a separate "resigned" state. When 
re-initializing after being shutdown as leader, we can enter the "resigned" 
state. This prevents us from voting for another candidate while still ensuring 
that a new election needs to be held.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9892) Producer state snapshot needs to be forced to disk

2020-10-29 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17223094#comment-17223094
 ] 

Jun Rao commented on KAFKA-9892:


[~kbrajesh176]: Thanks for your interest. Just added you to Kafka contributor 
list. So, feel free to assign this jira to yourself.

> Producer state snapshot needs to be forced to disk
> --
>
> Key: KAFKA-9892
> URL: https://issues.apache.org/jira/browse/KAFKA-9892
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.6.0
>Reporter: Jun Rao
>Priority: Major
>
> Currently, ProducerStateManager.writeSnapshot() only calls 
> fileChannel.close(), but not explicitly fileChannel.force(). It seems force() 
> is not guaranteed to be called on close(). 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei merged pull request #9426: MINOR: Fix flaky shouldRejectNonExistentStoreName

2020-10-29 Thread GitBox


vvcephei merged pull request #9426:
URL: https://github.com/apache/kafka/pull/9426


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9892) Producer state snapshot needs to be forced to disk

2020-10-29 Thread Brajesh Kumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17223070#comment-17223070
 ] 

 Brajesh Kumar commented on KAFKA-9892:
---

[~junrao]

[~ijuma]

Can I pick this up please? It seems to be a good beginner bug.

> Producer state snapshot needs to be forced to disk
> --
>
> Key: KAFKA-9892
> URL: https://issues.apache.org/jira/browse/KAFKA-9892
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.6.0
>Reporter: Jun Rao
>Priority: Major
>
> Currently, ProducerStateManager.writeSnapshot() only calls 
> fileChannel.close(), but not explicitly fileChannel.force(). It seems force() 
> is not guaranteed to be called on close(). 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #9426: MINOR: Fix flaky shouldRejectNonExistentStoreName

2020-10-29 Thread GitBox


vvcephei commented on pull request #9426:
URL: https://github.com/apache/kafka/pull/9426#issuecomment-718900823


   The only build failure was:
   `Build / JDK 11 / 
org.apache.kafka.streams.integration.StreamTableJoinTopologyOptimizationIntegrationTest.shouldDoStreamTableJoinWithDifferentNumberOfPartitions[Optimization
 = none]`
   
   Which seems to have been caused by the OS deleting some files from the `tmp` 
directory during the run.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10638) QueryableStateIntegrationTest fails due to stricter store checking

2020-10-29 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-10638.
--
Resolution: Fixed

> QueryableStateIntegrationTest fails due to stricter store checking
> --
>
> Key: KAFKA-10638
> URL: https://issues.apache.org/jira/browse/KAFKA-10638
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.7.0
>
>
> Observed:
> {code:java}
> org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state 
> store source-table because the stream thread is PARTITIONS_ASSIGNED, not 
> RUNNING
>   at 
> org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:81)
>   at 
> org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:50)
>   at 
> org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:52)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:200)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:119)
>   at 

[jira] [Updated] (KAFKA-10638) QueryableStateIntegrationTest fails due to stricter store checking

2020-10-29 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10638:
-
Fix Version/s: (was: 2.7.1)
   (was: 2.8.0)
   2.7.0

> QueryableStateIntegrationTest fails due to stricter store checking
> --
>
> Key: KAFKA-10638
> URL: https://issues.apache.org/jira/browse/KAFKA-10638
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.7.0
>
>
> Observed:
> {code:java}
> org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get state 
> store source-table because the stream thread is PARTITIONS_ASSIGNED, not 
> RUNNING
>   at 
> org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:81)
>   at 
> org.apache.kafka.streams.state.internals.WrappingStoreProvider.stores(WrappingStoreProvider.java:50)
>   at 
> org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:52)
>   at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:200)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> 

[jira] [Commented] (KAFKA-9948) Gradle Issue

2020-10-29 Thread Brajesh Kumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17223044#comment-17223044
 ] 

 Brajesh Kumar commented on KAFKA-9948:
---

[~dulvinw]

Which command you are using to build kafka?

As suggested by [~chia7712] You should use ".*/gradlew*" or can you try running 
"*gradle build*" instead of just "*gradle*" to build kafka code base.

> Gradle Issue
> 
>
> Key: KAFKA-9948
> URL: https://issues.apache.org/jira/browse/KAFKA-9948
> Project: Kafka
>  Issue Type: Bug
>  Components: build
>Affects Versions: 2.4.1
> Environment: gradle -v
> 
> Gradle 6.0.1
> 
> Build time:   2019-11-18 20:25:01 UTC
> Revision: fad121066a68c4701acd362daf4287a7c309a0f5
> Kotlin:   1.3.50
> Groovy:   2.5.8
> Ant:  Apache Ant(TM) version 1.10.7 compiled on September 1 2019
> JVM:  1.8.0_152 (Oracle Corporation 25.152-b16)
> OS:   Mac OS X 10.15.4 x86_64
>Reporter: Dulvin Witharane
>Priority: Blocker
>
> Can't get Gradle to build kafka.
>  
> Build file '/Users/dulvin/Documents/Work/git/kafka/build.gradle' line: 457
> A problem occurred evaluating root project 'kafka'.
> > Could not create task ':clients:spotbugsMain'.
>  > Could not create task of type 'SpotBugsTask'.
>  > Could not create an instance of type 
> com.github.spotbugs.internal.SpotBugsReportsImpl.
>  > 
> org.gradle.api.reporting.internal.TaskReportContainer.(Ljava/lang/Class;Lorg/gradle/api/Task;)V
>  
> The above error is thrown



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9948) Gradle Issue

2020-10-29 Thread Brajesh Kumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9948?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17223044#comment-17223044
 ] 

 Brajesh Kumar edited comment on KAFKA-9948 at 10/29/20, 5:02 PM:
--

[~dulvinw]

Which command you are using to build kafka?

As suggested by [~chia7712] You should use ".*/gradlew*" or can you try running 
"*gradle build*" instead of just "*gradle*" to build kafka code base. On my 
Amazon Linux system "*gradle build*" works fine too.


was (Author: kbrajesh176):
[~dulvinw]

Which command you are using to build kafka?

As suggested by [~chia7712] You should use ".*/gradlew*" or can you try running 
"*gradle build*" instead of just "*gradle*" to build kafka code base.

> Gradle Issue
> 
>
> Key: KAFKA-9948
> URL: https://issues.apache.org/jira/browse/KAFKA-9948
> Project: Kafka
>  Issue Type: Bug
>  Components: build
>Affects Versions: 2.4.1
> Environment: gradle -v
> 
> Gradle 6.0.1
> 
> Build time:   2019-11-18 20:25:01 UTC
> Revision: fad121066a68c4701acd362daf4287a7c309a0f5
> Kotlin:   1.3.50
> Groovy:   2.5.8
> Ant:  Apache Ant(TM) version 1.10.7 compiled on September 1 2019
> JVM:  1.8.0_152 (Oracle Corporation 25.152-b16)
> OS:   Mac OS X 10.15.4 x86_64
>Reporter: Dulvin Witharane
>Priority: Blocker
>
> Can't get Gradle to build kafka.
>  
> Build file '/Users/dulvin/Documents/Work/git/kafka/build.gradle' line: 457
> A problem occurred evaluating root project 'kafka'.
> > Could not create task ':clients:spotbugsMain'.
>  > Could not create task of type 'SpotBugsTask'.
>  > Could not create an instance of type 
> com.github.spotbugs.internal.SpotBugsReportsImpl.
>  > 
> org.gradle.api.reporting.internal.TaskReportContainer.(Ljava/lang/Class;Lorg/gradle/api/Task;)V
>  
> The above error is thrown



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

2020-10-29 Thread GitBox


vvcephei commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-718889494


   Hey @dongjinleekr , just a quick check-in: do you plan to take a look at the 
test failures?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10373) Kafka Reassign Partition is stuck with Java OutOfMemory error

2020-10-29 Thread Brajesh Kumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17223041#comment-17223041
 ] 

 Brajesh Kumar commented on KAFKA-10373:


Hi What is your Environment configurations(Linux/MAC OS)? Do you have any ACL 
set on the cluster?

 

Can you please share the steps to reproduce the issue. I am unable to reproduce 
the issue on Amazon Linux.

 

 
{code:java}
 
dev-dsk-brjkumar-1c-3f1604a7 % ./kafka-reassign-partitions.sh --zookeeper 
localhost:2181 --reassignment-json-file topics-reassignment.json 
--bootstrap-server localhost:9093 --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"test","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"test","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"test","partition":0,"replicas":[2,1],"log_dirs":["any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.
(20-10-29 16:55:05) <0> [~/kafka/kafka_2.11-2.2.1/bin]
dev-dsk-brjkumar-1c-3f1604a7 % ./kafka-reassign-partitions.sh --zookeeper 
localhost:2181 --reassignment-json-file topics-reassignment.json 
--bootstrap-server localhost:9093 --verify
Status of partition reassignment:
Reassignment of partition test-1 completed successfully
Reassignment of partition test-3 completed successfully
Reassignment of partition test-0 completed successfully
Reassignment of partition test-2 completed successfully{code}
 

> Kafka Reassign Partition is stuck with Java OutOfMemory error
> -
>
> Key: KAFKA-10373
> URL: https://issues.apache.org/jira/browse/KAFKA-10373
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.1
>Reporter: azher khan
>Priority: Major
>
> Hi Team,
> While trying to run the Kafka script to reassign partitions of an existing 
> topic, we are seeing a Java OutOfMemory issue.
>  
> The heap for Kafka is set to "-Xmx1G -Xms1G" on the kafka broker.
>  
> {code:java}
> /opt/kafka/bin/kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 
> --reassignment-json-file topic_kafka_topic1_reassignment.json 
> --bootstrap-server kafkabroker1:9092 --verify
> Status of partition reassignment:
> [2020-08-07 XX:XX:XX,] ERROR Uncaught exception in thread 
> 'kafka-admin-client-thread | reassign-partitions-tool': 
> (org.apache.kafka.common.utils.KafkaThread)
> java.lang.OutOfMemoryError: Java heap space
>  at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
>  at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>  at 
> org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
>  at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
>  at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:335)
>  at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:296)
>  at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:560)
>  at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:496)
>  at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
>  at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1116)
>  at java.lang.Thread.run(Thread.java:748)
> Reassignment of partition kafka_topic1-0 is still in progress
> Reassignment of partition kafka_topic1-1 is still in progress
> Reassignment of partition kafka_topic1-2 is still in progress{code}
>  
> Retried the above command after removing the "reassign_partitions" from 
> zookeeper as suggested but we are seeing the same error.
>  
>  
> {code:java}
> [zk: localhost:2181(CONNECTED) 5] delete /admin/reassign_partitions
> [zk: localhost:2181(CONNECTED) 7] ls /admin
> [delete_topics] 
> {code}
>  
> Would highly appreciate your advice,
> Thank you in advance,
>  
> Regards,
> Azher Khan



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10373) Kafka Reassign Partition is stuck with Java OutOfMemory error

2020-10-29 Thread Brajesh Kumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17223041#comment-17223041
 ] 

 Brajesh Kumar edited comment on KAFKA-10373 at 10/29/20, 4:59 PM:
---

Hi What is your Environment configurations(Linux/MAC OS)? Do you have any ACL 
set on the cluster?

 

[~azherullahkhan] Can you please share the steps to reproduce the issue. I am 
unable to reproduce the issue on Amazon Linux.

 

 
{code:java}
 
dev-dsk-brjkumar-1c-3f1604a7 % ./kafka-reassign-partitions.sh --zookeeper 
localhost:2181 --reassignment-json-file topics-reassignment.json 
--bootstrap-server localhost:9093 --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"test","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"test","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"test","partition":0,"replicas":[2,1],"log_dirs":["any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.
(20-10-29 16:55:05) <0> [~/kafka/kafka_2.11-2.2.1/bin]
dev-dsk-brjkumar-1c-3f1604a7 % ./kafka-reassign-partitions.sh --zookeeper 
localhost:2181 --reassignment-json-file topics-reassignment.json 
--bootstrap-server localhost:9093 --verify
Status of partition reassignment:
Reassignment of partition test-1 completed successfully
Reassignment of partition test-3 completed successfully
Reassignment of partition test-0 completed successfully
Reassignment of partition test-2 completed successfully{code}
 


was (Author: kbrajesh176):
Hi What is your Environment configurations(Linux/MAC OS)? Do you have any ACL 
set on the cluster?

 

Can you please share the steps to reproduce the issue. I am unable to reproduce 
the issue on Amazon Linux.

 

 
{code:java}
 
dev-dsk-brjkumar-1c-3f1604a7 % ./kafka-reassign-partitions.sh --zookeeper 
localhost:2181 --reassignment-json-file topics-reassignment.json 
--bootstrap-server localhost:9093 --execute
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test","partition":2,"replicas":[2,1],"log_dirs":["any","any"]},{"topic":"test","partition":1,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"test","partition":3,"replicas":[1,2],"log_dirs":["any","any"]},{"topic":"test","partition":0,"replicas":[2,1],"log_dirs":["any","any"]}]}
Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions.
(20-10-29 16:55:05) <0> [~/kafka/kafka_2.11-2.2.1/bin]
dev-dsk-brjkumar-1c-3f1604a7 % ./kafka-reassign-partitions.sh --zookeeper 
localhost:2181 --reassignment-json-file topics-reassignment.json 
--bootstrap-server localhost:9093 --verify
Status of partition reassignment:
Reassignment of partition test-1 completed successfully
Reassignment of partition test-3 completed successfully
Reassignment of partition test-0 completed successfully
Reassignment of partition test-2 completed successfully{code}
 

> Kafka Reassign Partition is stuck with Java OutOfMemory error
> -
>
> Key: KAFKA-10373
> URL: https://issues.apache.org/jira/browse/KAFKA-10373
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.1
>Reporter: azher khan
>Priority: Major
>
> Hi Team,
> While trying to run the Kafka script to reassign partitions of an existing 
> topic, we are seeing a Java OutOfMemory issue.
>  
> The heap for Kafka is set to "-Xmx1G -Xms1G" on the kafka broker.
>  
> {code:java}
> /opt/kafka/bin/kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 
> --reassignment-json-file topic_kafka_topic1_reassignment.json 
> --bootstrap-server kafkabroker1:9092 --verify
> Status of partition reassignment:
> [2020-08-07 XX:XX:XX,] ERROR Uncaught exception in thread 
> 'kafka-admin-client-thread | reassign-partitions-tool': 
> (org.apache.kafka.common.utils.KafkaThread)
> java.lang.OutOfMemoryError: Java heap space
>  at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
>  at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>  at 
> org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
>  at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
>  at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:335)
>  at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:296)
>  at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:560)
>  at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:496)
>  at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
>  at 
> 

[GitHub] [kafka] vvcephei merged pull request #9521: KAFKA-10638: Fix QueryableStateIntegrationTest

2020-10-29 Thread GitBox


vvcephei merged pull request #9521:
URL: https://github.com/apache/kafka/pull/9521


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9521: KAFKA-10638: Fix QueryableStateIntegrationTest

2020-10-29 Thread GitBox


vvcephei commented on pull request #9521:
URL: https://github.com/apache/kafka/pull/9521#issuecomment-718885978


   The tests passed, and I answered the two review questions. I'll go ahead and 
merge.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9467: KAFKA-10515: Properly initialize nullable Serdes with default values

2020-10-29 Thread GitBox


vvcephei commented on pull request #9467:
URL: https://github.com/apache/kafka/pull/9467#issuecomment-718882348


   _Finally_ merged that Jenkinsfile PR (seeing a lot of timeouts, but decided 
just to go ahead and merge it).
   
   Merged in the 2.6 branch, and now the tests are running! We should hopefully 
get to merge this today.
   
   Thanks again, @thake 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #9471: MINOR: Add Jenkinsfile to 2.6

2020-10-29 Thread GitBox


vvcephei merged pull request #9471:
URL: https://github.com/apache/kafka/pull/9471


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10660) Poll time out logstash

2020-10-29 Thread David (Jira)
David created KAFKA-10660:
-

 Summary: Poll time out logstash
 Key: KAFKA-10660
 URL: https://issues.apache.org/jira/browse/KAFKA-10660
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.2.1
 Environment: Non Production
Reporter: David


I am getting below message (logstash log from kafka input which I believe I 
need increase max.poll.interval.ms (I think the default is 3)

 

This member will leave the group because consumer poll timeout has expired. 
This means the time between subsequent calls to poll() was longer than the 
configured max.poll.interval.ms, which typically implies that the poll loop is 
spending too much time processing messages. You can address this either by 
increasing max.poll.interval.ms or by reducing the maximum size of batches 
returned in poll() with max.poll.records



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda commented on pull request #9529: Revert initial principal from 2.7

2020-10-29 Thread GitBox


abbccdda commented on pull request #9529:
URL: https://github.com/apache/kafka/pull/9529#issuecomment-718871156


   @bbejeck @hachikuji for a review



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda opened a new pull request #9529: Revert initial principal from 2.7

2020-10-29 Thread GitBox


abbccdda opened a new pull request #9529:
URL: https://github.com/apache/kafka/pull/9529


   Need to make sure 2.7 doesn't bump the request header unexpectedly.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #9522: MINOR: revert initial principal PR

2020-10-29 Thread GitBox


abbccdda commented on pull request #9522:
URL: https://github.com/apache/kafka/pull/9522#issuecomment-718869803


   Reopening this PR to merge to 2.7



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck opened a new pull request #9528: MINOR: Add releaseTarGz to args for building docs

2020-10-29 Thread GitBox


bbejeck opened a new pull request #9528:
URL: https://github.com/apache/kafka/pull/9528


   This PR adds the `releaseTarGz` command back into the 
`release.py::command_stage_docs` method when building docs for staging
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol

2020-10-29 Thread GitBox


hachikuji commented on pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#issuecomment-718828042


   @chia7712 One thing that would be useful is running the producer-performance 
test, just to make sure the the performance is inline. Might be worth checking 
flame graphs as well.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol

2020-10-29 Thread GitBox


hachikuji commented on a change in pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#discussion_r514326022



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
##
@@ -210,65 +142,42 @@ public String toString() {
 }
 }
 
+/**
+ * We have to copy acks, timeout, transactionalId and partitionSizes from 
data since data maybe reset to eliminate
+ * the reference to ByteBuffer but those metadata are still useful.
+ */
 private final short acks;
 private final int timeout;
 private final String transactionalId;
-
-private final Map partitionSizes;
-
+// visible for testing
+final Map partitionSizes;
+private boolean hasTransactionalRecords = false;
+private boolean hasIdempotentRecords = false;
 // This is set to null by `clearPartitionRecords` to prevent unnecessary 
memory retention when a produce request is
 // put in the purgatory (due to client throttling, it can take a while 
before the response is sent).
 // Care should be taken in methods that use this field.
-private volatile Map partitionRecords;
-private boolean hasTransactionalRecords = false;
-private boolean hasIdempotentRecords = false;
-
-private ProduceRequest(short version, short acks, int timeout, 
Map partitionRecords, String transactionalId) {
-super(ApiKeys.PRODUCE, version);
-this.acks = acks;
-this.timeout = timeout;
-
-this.transactionalId = transactionalId;
-this.partitionRecords = partitionRecords;
-this.partitionSizes = createPartitionSizes(partitionRecords);
+private volatile ProduceRequestData data;
 
-for (MemoryRecords records : partitionRecords.values()) {
-setFlags(records);
-}
-}
-
-private static Map 
createPartitionSizes(Map partitionRecords) {
-Map result = new 
HashMap<>(partitionRecords.size());
-for (Map.Entry entry : 
partitionRecords.entrySet())
-result.put(entry.getKey(), entry.getValue().sizeInBytes());
-return result;
-}
-
-public ProduceRequest(Struct struct, short version) {
+public ProduceRequest(ProduceRequestData produceRequestData, short 
version) {
 super(ApiKeys.PRODUCE, version);
-partitionRecords = new HashMap<>();
-for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) {
-Struct topicData = (Struct) topicDataObj;
-String topic = topicData.get(TOPIC_NAME);
-for (Object partitionResponseObj : 
topicData.getArray(PARTITION_DATA_KEY_NAME)) {
-Struct partitionResponse = (Struct) partitionResponseObj;
-int partition = partitionResponse.get(PARTITION_ID);
-MemoryRecords records = (MemoryRecords) 
partitionResponse.getRecords(RECORD_SET_KEY_NAME);
-setFlags(records);
-partitionRecords.put(new TopicPartition(topic, partition), 
records);
-}
-}
-partitionSizes = createPartitionSizes(partitionRecords);
-acks = struct.getShort(ACKS_KEY_NAME);
-timeout = struct.getInt(TIMEOUT_KEY_NAME);
-transactionalId = struct.getOrElse(NULLABLE_TRANSACTIONAL_ID, null);
-}
-
-private void setFlags(MemoryRecords records) {
-Iterator iterator = records.batches().iterator();
-MutableRecordBatch entry = iterator.next();
-hasIdempotentRecords = hasIdempotentRecords || entry.hasProducerId();
-hasTransactionalRecords = hasTransactionalRecords || 
entry.isTransactional();
+this.data = produceRequestData;
+this.data.topicData().forEach(topicProduceData -> 
topicProduceData.partitions()
+.forEach(partitionProduceData -> {
+MemoryRecords records = 
MemoryRecords.readableRecords(partitionProduceData.records());
+Iterator iterator = 
records.batches().iterator();
+MutableRecordBatch entry = iterator.next();
+hasIdempotentRecords = hasIdempotentRecords || 
entry.hasProducerId();

Review comment:
   Would it make sense to move this to the builder where we are already 
doing a pass over the partitions?

##
File path: clients/src/main/resources/common/message/ProduceRequest.json
##
@@ -33,21 +33,21 @@
   "validVersions": "0-8",
   "flexibleVersions": "none",
   "fields": [
-{ "name": "TransactionalId", "type": "string", "versions": "3+", 
"nullableVersions": "0+", "entityType": "transactionalId",
+{ "name": "TransactionalId", "type": "string", "versions": "3+", 
"nullableVersions": "3+", "ignorable": true, "entityType": "transactionalId",
   "about": "The transactional ID, or null if the producer is not 
transactional." },
 { "name": "Acks", "type": "int16", "versions": "0+",
   "about": "The number of acknowledgments the producer requires the leader 
to have received before considering a 

[GitHub] [kafka] vvcephei commented on pull request #9471: MINOR: Add Jenkinsfile to 2.6

2020-10-29 Thread GitBox


vvcephei commented on pull request #9471:
URL: https://github.com/apache/kafka/pull/9471#issuecomment-718826291


   Ok, I've run it a total of 5 times now, and it has passed two times and 
timed out three times.
   
   I did some analysis on the timed out logs, and found that every test that 
started either passed or got skipped. I was hoping to find a test that hung, 
but no luck.
   
   For example, I downloaded the logs for run 1, which timed out as 
`timeout-1.txt`:
   ```bash
   [john@arcturus Downloads]$ cat timeout-1.txt | grep STARTED | sed 's|^\[\S*] 
||' | sed 's/ STARTED//' | sort > /tmp/1-started
   [john@arcturus Downloads]$ cat timeout-1.txt | grep PASSED | sed 's|^\[\S*] 
||' | sed 's/ PASSED//' | sort > /tmp/1-passed
   [john@arcturus Downloads]$ cat timeout-1.txt | grep SKIPPED | sed 's|^\[\S*] 
||' | sed 's/ SKIPPED//' | sort > /tmp/1-skipped
   [john@arcturus Downloads]$ wc -l /tmp/1-skipped /tmp/1-passed 
61 /tmp/1-skipped
 10891 /tmp/1-passed
 10952 total
   [john@arcturus Downloads]$ wc -l /tmp/1-started 
   10952 /tmp/1-started
   ```
   
   Since all the other builds in all the other branches are working with 
effectively the same Jenkinsfile, I'm deeply suspicious that there actually is 
something wrong with the 2.6 codebase and not this PR itself.
   
   I think we should go ahead and merge this in some form. I could see:
   1. Just merge this as-is and file a Jira ticket to investigate why java 11 
builds time out on 2.6.
   2. Exclude java 11 from the jenkinsfile and go ahead and merge with just 
java 8 and 14.
   
   WDYT, @omkreddy @jolshan @ijuma ?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9521: KAFKA-10638: Fix QueryableStateIntegrationTest

2020-10-29 Thread GitBox


vvcephei commented on a change in pull request #9521:
URL: https://github.com/apache/kafka/pull/9521#discussion_r514325710



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##
@@ -153,51 +179,75 @@ public void shouldQuerySpecificActivePartitionStores() 
throws Exception {
 
 // Assert that all messages in the first batch were processed in a 
timely manner
 assertThat(semaphore.tryAcquire(batch1NumMessages, 60, 
TimeUnit.SECONDS), is(equalTo(true)));
-final KeyQueryMetadata keyQueryMetadata = 
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, 
numPartitions) -> 0);
-
-//key belongs to this partition
-final int keyPartition = keyQueryMetadata.partition();
-
-//key doesn't belongs to this partition
-final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
-final boolean kafkaStreams1IsActive = 
(keyQueryMetadata.activeHost().port() % 2) == 1;
-
-StoreQueryParameters> 
storeQueryParam =
-StoreQueryParameters.>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore())
-.withPartition(keyPartition);
-ReadOnlyKeyValueStore store1 = null;
-ReadOnlyKeyValueStore store2 = null;
-if (kafkaStreams1IsActive) {
-store1 = IntegrationTestUtils.getStore(kafkaStreams1, 
storeQueryParam);
-} else {
-store2 = IntegrationTestUtils.getStore(kafkaStreams2, 
storeQueryParam);
-}
-
-if (kafkaStreams1IsActive) {
-assertThat(store1, is(notNullValue()));
-assertThat(store2, is(nullValue()));
-} else {
-assertThat(store2, is(notNullValue()));
-assertThat(store1, is(nullValue()));
-}
+until(() -> {
+final KeyQueryMetadata keyQueryMetadata = 
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, 
numPartitions) -> 0);
+
+//key belongs to this partition
+final int keyPartition = keyQueryMetadata.partition();
+
+//key doesn't belongs to this partition
+final int keyDontBelongPartition = (keyPartition == 0) ? 1 : 0;
+final boolean kafkaStreams1IsActive = 
(keyQueryMetadata.activeHost().port() % 2) == 1;
+
+final StoreQueryParameters> storeQueryParam =
+StoreQueryParameters.>fromNameAndType(TABLE_NAME, keyValueStore())
+.withPartition(keyPartition);
+ReadOnlyKeyValueStore store1 = null;
+ReadOnlyKeyValueStore store2 = null;
+if (kafkaStreams1IsActive) {
+store1 = getStore(kafkaStreams1, storeQueryParam);
+} else {
+store2 = getStore(kafkaStreams2, storeQueryParam);
+}
+
+if (kafkaStreams1IsActive) {
+assertThat(store1, is(notNullValue()));
+assertThat(store2, is(nullValue()));
+} else {
+assertThat(store2, is(notNullValue()));
+assertThat(store1, is(nullValue()));
+}
+
+// Assert that only active for a specific requested partition 
serves key if stale stores and not enabled
+assertThat(kafkaStreams1IsActive ? store1.get(key) : 
store2.get(key), is(notNullValue()));
+
+final StoreQueryParameters> storeQueryParam2 =
+StoreQueryParameters.>fromNameAndType(TABLE_NAME, keyValueStore())
+.withPartition(keyDontBelongPartition);
 
-// Assert that only active for a specific requested partition serves 
key if stale stores and not enabled
-assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), 
is(notNullValue()));
 
-storeQueryParam = StoreQueryParameters.>fromNameAndType(TABLE_NAME, QueryableStoreTypes.keyValueStore())
-.withPartition(keyDontBelongPartition);
-ReadOnlyKeyValueStore store3 = null;
-ReadOnlyKeyValueStore store4 = null;
-if (!kafkaStreams1IsActive) {
-store3 = IntegrationTestUtils.getStore(kafkaStreams1, 
storeQueryParam);
-} else {
-store4 = IntegrationTestUtils.getStore(kafkaStreams2, 
storeQueryParam);
-}
 
-// Assert that key is not served when wrong specific partition is 
requested
-// If kafkaStreams1 is active for keyPartition, kafkaStreams2 would be 
active for keyDontBelongPartition
-// So, in that case, store3 would be null and the store4 would not 
return the value for key as wrong partition was requested
-assertThat(kafkaStreams1IsActive ? store4.get(key) : store3.get(key), 
is(nullValue()));
+try {
+// Assert that key is not served when wrong specific partition 
is requested
+// If kafkaStreams1 is active for keyPartition, kafkaStreams2 
would be active for keyDontBelongPartition
+  

[GitHub] [kafka] vvcephei commented on a change in pull request #9521: KAFKA-10638: Fix QueryableStateIntegrationTest

2020-10-29 Thread GitBox


vvcephei commented on a change in pull request #9521:
URL: https://github.com/apache/kafka/pull/9521#discussion_r514323639



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java
##
@@ -117,17 +126,34 @@ public void 
shouldQueryOnlyActivePartitionStoresByDefault() throws Exception {
 
 // Assert that all messages in the first batch were processed in a 
timely manner
 assertThat(semaphore.tryAcquire(batch1NumMessages, 60, 
TimeUnit.SECONDS), is(equalTo(true)));
-final KeyQueryMetadata keyQueryMetadata = 
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, 
numPartitions) -> 0);
-
-final QueryableStoreType> 
queryableStoreType = QueryableStoreTypes.keyValueStore();
-final ReadOnlyKeyValueStore store1 = 
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams1, queryableStoreType);
-final ReadOnlyKeyValueStore store2 = 
IntegrationTestUtils.getStore(TABLE_NAME, kafkaStreams2, queryableStoreType);
-
-final boolean kafkaStreams1IsActive = 
(keyQueryMetadata.activeHost().port() % 2) == 1;
-
-// Assert that only active is able to query for a key by default
-assertThat(kafkaStreams1IsActive ? store1.get(key) : store2.get(key), 
is(notNullValue()));
-assertThat(kafkaStreams1IsActive ? store2.get(key) : store1.get(key), 
is(nullValue()));
+until(() -> {
+
+final KeyQueryMetadata keyQueryMetadata = 
kafkaStreams1.queryMetadataForKey(TABLE_NAME, key, (topic, somekey, value, 
numPartitions) -> 0);
+
+final QueryableStoreType> 
queryableStoreType = keyValueStore();
+final ReadOnlyKeyValueStore store1 = 
getStore(TABLE_NAME, kafkaStreams1, queryableStoreType);
+final ReadOnlyKeyValueStore store2 = 
getStore(TABLE_NAME, kafkaStreams2, queryableStoreType);
+
+final boolean kafkaStreams1IsActive = 
(keyQueryMetadata.activeHost().port() % 2) == 1;
+
+// Assert that only active is able to query for a key by default
+assertThat(kafkaStreams1IsActive ? store1.get(key) : 
store2.get(key), is(notNullValue()));
+try {
+if (kafkaStreams1IsActive) {
+assertThat(store2.get(key), is(nullValue()));
+} else {
+assertThat(store1.get(key), is(nullValue()));
+}
+return true;
+} catch (final InvalidStateStoreException exception) {

Review comment:
   I wanted to keep the concerns separate, so that unexpected exceptions 
would cause the test to fail fast. The idea is that `until` is the inverse of 
`while`, namely, it just loops as long as the condition evaluates to `false`. 
If the condition throws an exception, then the loop also throws, just like the 
real `while` loop.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >