[GitHub] [kafka] chia7712 commented on pull request #9767: KAFKA-10047: Remove unnecessary widening of (int to long) scope in FloatSerializer.

2020-12-18 Thread GitBox


chia7712 commented on pull request #9767:
URL: https://github.com/apache/kafka/pull/9767#issuecomment-748433375


   Also, KAFKA-10047 already has a PR 
(https://github.com/apache/kafka/pull/9351). If you want to create a PR for it, 
it would be better to ask the author of PR (or reporter of jira) if they don't 
work on it anymore. If so, it is ok to take over the issue.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9767: KAFKA-10047: Remove unnecessary widening of (int to long) scope in FloatSerializer.

2020-12-18 Thread GitBox


chia7712 commented on pull request #9767:
URL: https://github.com/apache/kafka/pull/9767#issuecomment-748432898


   @tguruprasad It would be better to ping author or kafka active committers to 
review code :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9520: MINOR: replace test "expected" parameter by assertThrows

2020-12-18 Thread GitBox


chia7712 commented on pull request #9520:
URL: https://github.com/apache/kafka/pull/9520#issuecomment-748432641


   @ijuma Any thoughts to this PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9758: MINOR: remove FetchResponse.AbortedTransaction and redundant construc…

2020-12-18 Thread GitBox


chia7712 commented on a change in pull request #9758:
URL: https://github.com/apache/kafka/pull/9758#discussion_r546203915



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -777,12 +775,12 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
 
 def maybeConvertFetchedData(tp: TopicPartition,

Review comment:
   @ijuma This is the only case that we create 
```LazyDownConversionRecords``` in production. Through this PR, this case can 
get rid of generic ```FetchResponse.PartitionData```. Hence, we can remove 
generic from ```FetchResponse.PartitionData``` after this PR goes in trunk.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 opened a new pull request #9768: HOTFIX: fix failed ControllerChannelManagerTest#testUpdateMetadataReq…

2020-12-18 Thread GitBox


chia7712 opened a new pull request #9768:
URL: https://github.com/apache/kafka/pull/9768


   as title
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tguruprasad commented on pull request #9767: KAFKA-10047: Remove unnecessary widening of (int to long) scope in FloatSerializer.

2020-12-18 Thread GitBox


tguruprasad commented on pull request #9767:
URL: https://github.com/apache/kafka/pull/9767#issuecomment-748423867


   Apologizes for spamming folks who are tagged. As mentioned in [this 
wiki](https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes),
 I selected GitHub's automatic suggestions to tag the folks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10547) Add topic IDs to MetadataResponse

2020-12-18 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10547?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming resolved KAFKA-10547.

Resolution: Done

> Add topic IDs to MetadataResponse
> -
>
> Key: KAFKA-10547
> URL: https://issues.apache.org/jira/browse/KAFKA-10547
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: dengziming
>Priority: Major
>
> Will be able to use TopicDescription to identify the topic ID



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dengziming commented on pull request #9766: KAFKA-10864: Convert end txn marker schema to use auto-generated protocol

2020-12-18 Thread GitBox


dengziming commented on pull request #9766:
URL: https://github.com/apache/kafka/pull/9766#issuecomment-748414238


   > Thanks for the PR, one high level question I have is that whether the new 
auto generated protocol format is exactly the same as old hard-coded schema 
format, since this field is used inter-broker. Could you write a test to prove 
that they could translate between each other?
   
   Thank you, I tested that the 2 schemas could translate between each other 
locally, now I add the code to `EndTransactionMarkerTest`.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #9766: KAFKA-10864: Convert end txn marker schema to use auto-generated protocol

2020-12-18 Thread GitBox


dengziming commented on a change in pull request #9766:
URL: https://github.com/apache/kafka/pull/9766#discussion_r546185834



##
File path: clients/src/main/resources/common/message/EndTxnMarker.json
##
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "type": "data",
+  "name": "EndTxnMarker",
+  "validVersions": "0",
+  "fields": [
+{ "name": "CoordinatorEpoch", "type": "int32", "versions": "0+"}

Review comment:
   Firstly I tried to use flexible version here, but it's not the same as 
the old hard-coded schema format. Could we bump the version and support 
flexible version from version 1 ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9712: KAFKA-10822 Force some stdout from downgrade_test.py/upgrade_test.py for Travis

2020-12-18 Thread GitBox


chia7712 commented on pull request #9712:
URL: https://github.com/apache/kafka/pull/9712#issuecomment-748409302


   > Looks like the build is still in error?
   
   Thanks for your reviews. The main purpose of this PR is to avoid timeout 
from downgrade_test.py/upgrade_test.py. I browse the test results and the 
timeout does not happen on downgrade_test.py/upgrade_test.py again. After 
fixing the timeout issue, I will go back to dead with the true failure of 
system tests.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10687) Produce request should be bumped for new error code PRODUCE_FENCED

2020-12-18 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10687?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-10687:

Description: In https://issues.apache.org/jira/browse/KAFKA-9911, we missed 
a case where the ProduceRequest needs to be bumped to return the new error code 
PRODUCE_FENCED. This gap needs to be addressed as a blocker since it is 
shipping in 2.7.  (was: In https://issues.apache.org/jira/browse/KAFKA-9910, we 
missed a case where the ProduceRequest needs to be bumped to return the new 
error code PRODUCE_FENCED. This gap needs to be addressed as a blocker since it 
is shipping in 2.7.)

> Produce request should be bumped for new error code PRODUCE_FENCED
> --
>
> Key: KAFKA-10687
> URL: https://issues.apache.org/jira/browse/KAFKA-10687
> Project: Kafka
>  Issue Type: Bug
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Blocker
> Fix For: 2.7.0
>
>
> In https://issues.apache.org/jira/browse/KAFKA-9911, we missed a case where 
> the ProduceRequest needs to be bumped to return the new error code 
> PRODUCE_FENCED. This gap needs to be addressed as a blocker since it is 
> shipping in 2.7.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10870) Consumer should handle REBALANCE_IN_PROGRESS from JoinGroup

2020-12-18 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-10870:

Description: 
We hit a timeout when persisting group metadata to the __consumer_offsets topic:
{code}
[2020-12-18 18:06:08,209] DEBUG [GroupMetadataManager brokerId=1] Metadata from 
group test_group_id with generation 1 failed when appending to log due to 
org.apache.kafka.common.errors.TimeoutException 
(kafka.coordinator.group.GroupMetadataManager)
[2020-12-18 18:06:08,210] WARN [GroupCoordinator 1]: Failed to persist metadata 
for group test_group_id: The group is rebalancing, so a rejoin is needed. 
(kafka.coordinator.group.GroupCoordinator)
{code}

This in turn resulted in a REBALANCE_IN_PROGRESS being returned from the 
JoinGroup:

{code}
[2020-12-18 18:06:08,211] INFO Completed 
request:RequestHeader(apiKey=JOIN_GROUP, apiVersion=7, 
clientId=consumer-test_group_id-test_group_id-instance-1, correlationId=3) -- 
{group_id=test_group_id,session_timeout_ms=6,rebalance_timeout_ms=30,member_id=,group_instance_id=test_group_id-instance-1,protocol_type=consumer,protocols=[{name=range,metadata=java.nio.HeapByteBuffer[pos=0
 lim=26 
cap=26],_tagged_fields={}}],_tagged_fields={}},response:{throttle_time_ms=0,error_code=27,generation_id=1,protocol_type=consumer,protocol_name=range,leader=test_group_id-instance-2-32e72316-2c3f-40d6-bc34-8ec23d633d34,member_id=,members=[],_tagged_fields={}}
 from connection 
172.31.46.222:9092-172.31.44.169:41310-6;totalTime:5014.825,requestQueueTime:0.193,localTime:11.575,remoteTime:5002.195,throttleTime:0.66,responseQueueTime:0.105,sendTime:0.094,sendIoTime:0.038,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=apache-kafka-java,
 softwareVersion=5.5.3-ce) (kafka.request.logger)  
{code}

The consumer has no logic to handle REBALANCE_IN_PROGRESS from JoinGroup.

{code}
[2020-12-18 18:06:08,210] ERROR [Consumer instanceId=test_group_id-instance-1, 
clientId=consumer-test_group_id-test_group_id-instance-1, 
groupId=test_group_id] Attempt to join group failed due to unexpected error
: The group is rebalancing, so a rejoin is needed. 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-12-18 18:06:08,211] INFO [Consumer instanceId=test_group_id-instance-1, 
clientId=consumer-test_group_id-test_group_id-instance-1, 
groupId=test_group_id] Join group failed with org.apache.kafka.common.KafkaE
xception: Unexpected error in join group response: The group is rebalancing, so 
a rejoin is needed. 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-12-18 18:06:08,211] ERROR Error during processing, terminating consumer 
process:  (org.apache.kafka.tools.VerifiableConsumer)
org.apache.kafka.common.KafkaException: Unexpected error in join group 
response: The group is rebalancing, so a rejoin is needed.
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:653)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:574)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1096)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1076)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
{code}

  was:
We hit a timeout when persisting group metadata to the __consumer_offsets topic:
{code}
[2020-12-18 18:06:07,889] DEBUG Created a new incremental FetchContext for 
session id 5832, epoch 53: added 0 partition(s), updated 0 partition(s), 
removed 0 partition(s) (kafka.server.FetchManager)
[2020-12-18 18:06:08,209] DEBUG [GroupMetadataManager brokerId=1] Metadata from 
group test_group_id with generation 1 failed when appending to log due to 
org.apache.kafka.common.errors.TimeoutException 
(kafka.coordinator.group.GroupMetadataManager)
[2020-12-18 18:06:08,210] WARN [GroupCoordinator 1]: Failed to persist metadata 
for group test_group_id: The group is rebalancing, so a rejoin is needed. 
(kafka.coordinator.group.GroupCoordinator)
{code}

This in turn resulted in a REBALANCE_IN_PROGRESS being returned from the 
JoinGroup:

{code}
[2020-12-18 18:06:08,211] INFO Completed 
request:RequestHeader(apiKey=JOIN_GROUP, apiVersion=7, 
clientId=consumer-test_group_id-test_group_id-instance-1, correlationId=3) -- 
{group_id=test_group_id,session_timeout_ms=6,rebalance_timeout_ms=30,member_id=,group_instance_id=test_group_id-instance-1,protocol_type=consumer,protocols=[{name=range,metadata=java.nio.HeapByteBuffer[pos=0
 lim=26 

[jira] [Created] (KAFKA-10870) Consumer should handle REBALANCE_IN_PROGRESS from JoinGroup

2020-12-18 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-10870:
---

 Summary: Consumer should handle REBALANCE_IN_PROGRESS from 
JoinGroup
 Key: KAFKA-10870
 URL: https://issues.apache.org/jira/browse/KAFKA-10870
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson


We hit a timeout when persisting group metadata to the __consumer_offsets topic:
{code}
[2020-12-18 18:06:07,889] DEBUG Created a new incremental FetchContext for 
session id 5832, epoch 53: added 0 partition(s), updated 0 partition(s), 
removed 0 partition(s) (kafka.server.FetchManager)
[2020-12-18 18:06:08,209] DEBUG [GroupMetadataManager brokerId=1] Metadata from 
group test_group_id with generation 1 failed when appending to log due to 
org.apache.kafka.common.errors.TimeoutException 
(kafka.coordinator.group.GroupMetadataManager)
[2020-12-18 18:06:08,210] WARN [GroupCoordinator 1]: Failed to persist metadata 
for group test_group_id: The group is rebalancing, so a rejoin is needed. 
(kafka.coordinator.group.GroupCoordinator)
{code}

This in turn resulted in a REBALANCE_IN_PROGRESS being returned from the 
JoinGroup:

{code}
[2020-12-18 18:06:08,211] INFO Completed 
request:RequestHeader(apiKey=JOIN_GROUP, apiVersion=7, 
clientId=consumer-test_group_id-test_group_id-instance-1, correlationId=3) -- 
{group_id=test_group_id,session_timeout_ms=6,rebalance_timeout_ms=30,member_id=,group_instance_id=test_group_id-instance-1,protocol_type=consumer,protocols=[{name=range,metadata=java.nio.HeapByteBuffer[pos=0
 lim=26 
cap=26],_tagged_fields={}}],_tagged_fields={}},response:{throttle_time_ms=0,error_code=27,generation_id=1,protocol_type=consumer,protocol_name=range,leader=test_group_id-instance-2-32e72316-2c3f-40d6-bc34-8ec23d633d34,member_id=,members=[],_tagged_fields={}}
 from connection 
172.31.46.222:9092-172.31.44.169:41310-6;totalTime:5014.825,requestQueueTime:0.193,localTime:11.575,remoteTime:5002.195,throttleTime:0.66,responseQueueTime:0.105,sendTime:0.094,sendIoTime:0.038,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT,clientInformation:ClientInformation(softwareName=apache-kafka-java,
 softwareVersion=5.5.3-ce) (kafka.request.logger)  
{code}

The consumer has no logic to handle REBALANCE_IN_PROGRESS from JoinGroup.

{code}
[2020-12-18 18:06:08,210] ERROR [Consumer instanceId=test_group_id-instance-1, 
clientId=consumer-test_group_id-test_group_id-instance-1, 
groupId=test_group_id] Attempt to join group failed due to unexpected error
: The group is rebalancing, so a rejoin is needed. 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-12-18 18:06:08,211] INFO [Consumer instanceId=test_group_id-instance-1, 
clientId=consumer-test_group_id-test_group_id-instance-1, 
groupId=test_group_id] Join group failed with org.apache.kafka.common.KafkaE
xception: Unexpected error in join group response: The group is rebalancing, so 
a rejoin is needed. 
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-12-18 18:06:08,211] ERROR Error during processing, terminating consumer 
process:  (org.apache.kafka.tools.VerifiableConsumer)
org.apache.kafka.common.KafkaException: Unexpected error in join group 
response: The group is rebalancing, so a rejoin is needed.
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:653)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:574)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1096)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1076)
at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] glasser commented on pull request #9767: KAFKA-10047: Remove unnecessary widening of (int to long) scope in FloatSerializer.

2020-12-18 Thread GitBox


glasser commented on pull request #9767:
URL: https://github.com/apache/kafka/pull/9767#issuecomment-748376004


   I'm not sure why I'm tagged; did you mean somebody else?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10545) Create topic IDs and propagate to brokers

2020-12-18 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan resolved KAFKA-10545.

Resolution: Done

> Create topic IDs and propagate to brokers
> -
>
> Key: KAFKA-10545
> URL: https://issues.apache.org/jira/browse/KAFKA-10545
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Major
>
> First step for KIP-516
> The goals are:
>  * Create and store topic IDs in a ZK Node and controller memory.
>  * Propagate topic ID to brokers with updated LeaderAndIsrRequest, 
> UpdateMetadata
>  * Store topic ID in memory on broker, persistent file in log



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram merged pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

2020-12-18 Thread GitBox


rajinisivaram merged pull request #9626:
URL: https://github.com/apache/kafka/pull/9626


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

2020-12-18 Thread GitBox


rajinisivaram commented on pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#issuecomment-748345741


   @jolshan Thanks for the PR, the last build was good, so merging to trunk



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram merged pull request #9622: KAFKA-10547; add topicId in MetadataResp

2020-12-18 Thread GitBox


rajinisivaram merged pull request #9622:
URL: https://github.com/apache/kafka/pull/9622


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #9622: KAFKA-10547; add topicId in MetadataResp

2020-12-18 Thread GitBox


rajinisivaram commented on pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#issuecomment-748328137


   @dengziming Thanks for the PR, merging to trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tguruprasad opened a new pull request #9767: KAFKA-10047: Remove unnecessary widening of (int to long) scope in FloatSerializer.

2020-12-18 Thread GitBox


tguruprasad opened a new pull request #9767:
URL: https://github.com/apache/kafka/pull/9767


   @halorgium @astubbs @alexism @glasser @rhardouin 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #9736: Add configurable workloads and E2E latency tracking to Trogdor.

2020-12-18 Thread GitBox


cmccabe merged pull request #9736:
URL: https://github.com/apache/kafka/pull/9736


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #9736: Add configurable workloads and E2E latency tracking to Trogdor.

2020-12-18 Thread GitBox


cmccabe commented on pull request #9736:
URL: https://github.com/apache/kafka/pull/9736#issuecomment-748317827


   > I wanted this timestamp to live outside the Kafka broker/client, as that 
is what customers will see. We can also validate the numbers within Kafka on 
this if we want, I just wanted to make sure we captured every bit of latency 
available. We're running this test on systems that are millisecond synced to 
get an accurate reading.
   
   OK.
   
   > The Throttle architecture didn't exactly work for the variable throughput 
generation. In order to add partition-specific testing I had to limit the test 
to 1 topic. I am by no means an experienced Java engineer and I couldn't find a 
way of modifying the original, or even subclassing it, without forcing the spec 
to change.
   
   Hmm.  We could probably do some more work to share code, but in the 
meantime, this is a nice improvement.
   
   LGTM.  Thanks, @scott-hendricks 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10869) Gate topic IDs behind IBP 2.8

2020-12-18 Thread Justine Olshan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10869?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Justine Olshan updated KAFKA-10869:
---
Priority: Blocker  (was: Major)

> Gate topic IDs behind IBP 2.8
> -
>
> Key: KAFKA-10869
> URL: https://issues.apache.org/jira/browse/KAFKA-10869
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Justine Olshan
>Assignee: Justine Olshan
>Priority: Blocker
>
> We want to do this so we don't lose topic IDs upon downgrades. If we 
> downgrade and write to topic node in ZK, the topic ID will be lost.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10869) Gate topic IDs behind IBP 2.8

2020-12-18 Thread Justine Olshan (Jira)
Justine Olshan created KAFKA-10869:
--

 Summary: Gate topic IDs behind IBP 2.8
 Key: KAFKA-10869
 URL: https://issues.apache.org/jira/browse/KAFKA-10869
 Project: Kafka
  Issue Type: Sub-task
Reporter: Justine Olshan
Assignee: Justine Olshan


We want to do this so we don't lose topic IDs upon downgrades. If we downgrade 
and write to topic node in ZK, the topic ID will be lost.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #9713: KAFKA-10825 ZK ISR manager

2020-12-18 Thread GitBox


hachikuji commented on a change in pull request #9713:
URL: https://github.com/apache/kafka/pull/9713#discussion_r546044857



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1374,46 +1314,29 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
-  private def shrinkIsrWithZk(newIsr: Set[Int]): Unit = {
-val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
-val zkVersionOpt = stateStore.shrinkIsr(controllerEpoch, newLeaderAndIsr)
-if (zkVersionOpt.isDefined) {
-  isrChangeListener.markShrink()
-}
-maybeUpdateIsrAndVersionWithZk(newIsr, zkVersionOpt)
-  }
-
-  private def maybeUpdateIsrAndVersionWithZk(isr: Set[Int], zkVersionOpt: 
Option[Int]): Unit = {
-zkVersionOpt match {
-  case Some(newVersion) =>
-isrState = CommittedIsr(isr)
-zkVersion = newVersion
-info("ISR updated to [%s] and zkVersion updated to 
[%d]".format(isr.mkString(","), zkVersion))
-
-  case None =>
-info(s"Cached zkVersion $zkVersion not equal to that in zookeeper, 
skip updating ISR")
-isrChangeListener.markFailed()
-}
-  }
-
   private def sendAlterIsrRequest(proposedIsrState: IsrState): Unit = {
 val isrToSend: Set[Int] = proposedIsrState match {
   case PendingExpandIsr(isr, newInSyncReplicaId) => isr + 
newInSyncReplicaId
   case PendingShrinkIsr(isr, outOfSyncReplicaIds) => isr -- 
outOfSyncReplicaIds
   case state =>
+isrChangeListener.markFailed()
 throw new IllegalStateException(s"Invalid state $state for `AlterIsr` 
request for partition $topicPartition")
 }
 
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
isrToSend.toList, zkVersion)
-val alterIsrItem = AlterIsrItem(topicPartition, newLeaderAndIsr, 
handleAlterIsrResponse(proposedIsrState))
+val alterIsrItem = AlterIsrItem(topicPartition, newLeaderAndIsr, 
handleAlterIsrResponse(proposedIsrState), controllerEpoch)
 
-if (!alterIsrManager.enqueue(alterIsrItem)) {
+val oldState = isrState
+isrState = proposedIsrState
+
+if (!alterIsrManager.submit(alterIsrItem)) {
+  // If the ISR manager did not accept our update, we need to revert back 
to previous state
+  isrState = oldState
   isrChangeListener.markFailed()
   throw new IllegalStateException(s"Failed to enqueue `AlterIsr` request 
with state " +
 s"$newLeaderAndIsr for partition $topicPartition")
 }
 
-isrState = proposedIsrState
 debug(s"Sent `AlterIsr` request to change state to $newLeaderAndIsr after 
transition to $proposedIsrState")

Review comment:
   Should we generalize this log message as well? Also the couple 
`IllegalStateException` messages above.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9713: KAFKA-10825 ZK ISR manager

2020-12-18 Thread GitBox


hachikuji commented on a change in pull request #9713:
URL: https://github.com/apache/kafka/pull/9713#discussion_r546044857



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1374,46 +1314,29 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
-  private def shrinkIsrWithZk(newIsr: Set[Int]): Unit = {
-val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
-val zkVersionOpt = stateStore.shrinkIsr(controllerEpoch, newLeaderAndIsr)
-if (zkVersionOpt.isDefined) {
-  isrChangeListener.markShrink()
-}
-maybeUpdateIsrAndVersionWithZk(newIsr, zkVersionOpt)
-  }
-
-  private def maybeUpdateIsrAndVersionWithZk(isr: Set[Int], zkVersionOpt: 
Option[Int]): Unit = {
-zkVersionOpt match {
-  case Some(newVersion) =>
-isrState = CommittedIsr(isr)
-zkVersion = newVersion
-info("ISR updated to [%s] and zkVersion updated to 
[%d]".format(isr.mkString(","), zkVersion))
-
-  case None =>
-info(s"Cached zkVersion $zkVersion not equal to that in zookeeper, 
skip updating ISR")
-isrChangeListener.markFailed()
-}
-  }
-
   private def sendAlterIsrRequest(proposedIsrState: IsrState): Unit = {
 val isrToSend: Set[Int] = proposedIsrState match {
   case PendingExpandIsr(isr, newInSyncReplicaId) => isr + 
newInSyncReplicaId
   case PendingShrinkIsr(isr, outOfSyncReplicaIds) => isr -- 
outOfSyncReplicaIds
   case state =>
+isrChangeListener.markFailed()
 throw new IllegalStateException(s"Invalid state $state for `AlterIsr` 
request for partition $topicPartition")
 }
 
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
isrToSend.toList, zkVersion)
-val alterIsrItem = AlterIsrItem(topicPartition, newLeaderAndIsr, 
handleAlterIsrResponse(proposedIsrState))
+val alterIsrItem = AlterIsrItem(topicPartition, newLeaderAndIsr, 
handleAlterIsrResponse(proposedIsrState), controllerEpoch)
 
-if (!alterIsrManager.enqueue(alterIsrItem)) {
+val oldState = isrState
+isrState = proposedIsrState
+
+if (!alterIsrManager.submit(alterIsrItem)) {
+  // If the ISR manager did not accept our update, we need to revert back 
to previous state
+  isrState = oldState
   isrChangeListener.markFailed()
   throw new IllegalStateException(s"Failed to enqueue `AlterIsr` request 
with state " +
 s"$newLeaderAndIsr for partition $topicPartition")
 }
 
-isrState = proposedIsrState
 debug(s"Sent `AlterIsr` request to change state to $newLeaderAndIsr after 
transition to $proposedIsrState")

Review comment:
   Should we generalize this log message as well? Also the 
`IllegalStateException` message above.

##
File path: core/src/main/scala/kafka/server/ZkIsrManager.scala
##
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.utils.{Logging, ReplicationUtils, Scheduler}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.utils.Time
+
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicLong
+import scala.collection.mutable
+
+/**
+ * @param checkIntervalMs How often to check for ISR
+ * @param maxDelayMs  Maximum time that an ISR change may be delayed before 
sending the notification
+ * @param lingerMs  Maximum time to await additional changes before sending 
the notification
+ */
+case class IsrChangePropagationConfig(checkIntervalMs: Long, maxDelayMs: Long, 
lingerMs: Long)
+
+object ZkIsrManager {
+  // This field is mutable to allow overriding change notification behavior in 
test cases
+  @volatile var DefaultIsrPropagationConfig: IsrChangePropagationConfig = 
IsrChangePropagationConfig(
+checkIntervalMs = 2500,
+lingerMs = 5000,
+maxDelayMs = 6,
+  )
+}
+
+class ZkIsrManager(scheduler: Scheduler, time: Time, zkClient: KafkaZkClient) 
extends AlterIsrManager with Logging {
+
+  private val isrChangeNotificationConfig = 
ZkIsrManager.DefaultIsrPropagationConfig
+  // Visible for testing
+  private[server] 

[GitHub] [kafka] JimGalasyn commented on pull request #9670: DOCS-6076: Clarify config names for EOS versions 1 and 2

2020-12-18 Thread GitBox


JimGalasyn commented on pull request #9670:
URL: https://github.com/apache/kafka/pull/9670#issuecomment-748281814


   @abbccdda cool, does this work? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10843) Kafka Streams metadataForKey method returns null but allMetadata has the details

2020-12-18 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17251960#comment-17251960
 ] 

Matthias J. Sax commented on KAFKA-10843:
-

{quote}You want me to update the Kafka-streams version to 2.6.0?
{quote}
I might be worth a try.
{quote}With regards to using allMetadata(), this wont still give me the 
particular store where my key is right? It would give all the hosts where the 
store is present, but my key could be on any of them. 
{quote}
Well, yes, but you see the partition information, and thus you could hash the 
key yourself to find the right instance.

Curious to hear your answer to Bruno's questions. – If you use custom 
partitioning, you would need to hand in a custom `StreamsPartitioner` to allow 
KafkaStreams to compute the right partitions to find the right instance.

> Kafka Streams metadataForKey method returns null but allMetadata has the 
> details
> 
>
> Key: KAFKA-10843
> URL: https://issues.apache.org/jira/browse/KAFKA-10843
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.1
>Reporter: Maria Thomas
>Priority: Major
>
> Our application runs on multiple instances and to enable us to use get the 
> key information from the state store we use "metadataForKey" method to 
> retrieve the StreamMetadata and using the hostname do an RPC call to the host 
> to get the value associated with the key.
> This call was working fine in our DEV and TEST environments, however, it is 
> failing one our production clusters from the start. On further debugging, I 
> noticed the allMetadata() method was returning the state stores with the host 
> details as expected. However, it would not be feasible to go through each 
> store explicitly to get the key details.
> To note, the cluster I am using is a stretch cluster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10868) Avoid double wrapping KafkaException

2020-12-18 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-10868:

Description: Today certain exceptions get double wraps of KafkaException. 
We should remove those cases

> Avoid double wrapping KafkaException
> 
>
> Key: KAFKA-10868
> URL: https://issues.apache.org/jira/browse/KAFKA-10868
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Priority: Major
>
> Today certain exceptions get double wraps of KafkaException. We should remove 
> those cases



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10868) Avoid double wrapping KafkaException

2020-12-18 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10868:
---

 Summary: Avoid double wrapping KafkaException
 Key: KAFKA-10868
 URL: https://issues.apache.org/jira/browse/KAFKA-10868
 Project: Kafka
  Issue Type: Sub-task
Reporter: Boyang Chen






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ncliang commented on a change in pull request #9765: KAFKA-10763: Fix incomplete cooperative rebalances preventing connector/task revocations

2020-12-18 Thread GitBox


ncliang commented on a change in pull request #9765:
URL: https://github.com/apache/kafka/pull/9765#discussion_r546001555



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
##
@@ -566,6 +566,112 @@ public Boolean answer() throws Throwable {
 PowerMock.verifyAll();
 }
 
+@Test
+public void testRevoke() throws TimeoutException {
+revokeAndReassign(false);
+}
+
+@Test
+public void testIncompleteRebalanceBeforeRevoke() throws TimeoutException {
+revokeAndReassign(true);
+}
+
+public void revokeAndReassign(boolean incompleteRebalance) throws 
TimeoutException {

Review comment:
   It's hard to tell if this actually reproduces the issue or not due to 
the heavy mocking required. Is there a more direct way to reproduce? Maybe in 
`RebalanceSourceConnectorsIntegrationTest` or similar? Even if the IT ends up 
being flaky, having that repro would boost confidence in this fix.

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -1740,7 +1741,7 @@ public void onRevoked(String leader, Collection 
connectors, Collection= 
CONNECT_PROTOCOL_V1) {

Review comment:
   Maybe add a comment explaining why the additional check is needed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-18 Thread GitBox


d8tltanc commented on pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#issuecomment-748242438


   Thank you @rajinisivaram 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #9622: KAFKA-10547; add topicId in MetadataResp

2020-12-18 Thread GitBox


rajinisivaram commented on pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#issuecomment-748239592


   There is a compilation failure in the Java 8 build because the PR uses an 
API not available in Scala 2.12. Will push a fix.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram merged pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-18 Thread GitBox


rajinisivaram merged pull request #9485:
URL: https://github.com/apache/kafka/pull/9485


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9766: KAFKA-10864: Convert end txn marker schema to use auto-generated protocol

2020-12-18 Thread GitBox


abbccdda commented on a change in pull request #9766:
URL: https://github.com/apache/kafka/pull/9766#discussion_r545985013



##
File path: 
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##
@@ -618,8 +618,7 @@ public static MemoryRecords withEndTransactionMarker(long 
timestamp, long produc
 public static MemoryRecords withEndTransactionMarker(long initialOffset, 
long timestamp, int partitionLeaderEpoch,
  long producerId, 
short producerEpoch,
  EndTransactionMarker 
marker) {
-int endTxnMarkerBatchSize = DefaultRecordBatch.RECORD_BATCH_OVERHEAD +
-EndTransactionMarker.CURRENT_END_TXN_SCHEMA_RECORD_SIZE;
+int endTxnMarkerBatchSize = DefaultRecordBatch.RECORD_BATCH_OVERHEAD + 
marker.endTnxMarkerValueSize();

Review comment:
   s/endTnxMarkerValueSize/endTxnMarkerValueSize

##
File path: clients/src/main/resources/common/message/EndTxnMarker.json
##
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "type": "data",
+  "name": "EndTxnMarker",
+  "validVersions": "0",
+  "fields": [
+{ "name": "CoordinatorEpoch", "type": "int32", "versions": "0+"}

Review comment:
   Add a comment for the field. Also do you think we need flexible version 
support here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9508: KAFKA-10648: Add Prefix Scan support to State Stores

2020-12-18 Thread GitBox


cadonna commented on a change in pull request #9508:
URL: https://github.com/apache/kafka/pull/9508#discussion_r545986204



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksIterator;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+class RocksDBPrefixIterator extends RocksDbIterator {

Review comment:
   Could you not use the same technique as you use in the in-memory 
key-value state store and the cache?
   
   ```
   final Bytes from = Bytes.wrap(prefixKeySerializer.serialize(null, 
prefix));
   final Bytes to = Bytes.increment(from);
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10867) Implement improved semantics using the ConsumerRecords meta

2020-12-18 Thread John Roesler (Jira)
John Roesler created KAFKA-10867:


 Summary: Implement improved semantics using the ConsumerRecords 
meta
 Key: KAFKA-10867
 URL: https://issues.apache.org/jira/browse/KAFKA-10867
 Project: Kafka
  Issue Type: Sub-task
  Components: streams
Reporter: John Roesler
Assignee: John Roesler
 Fix For: 2.8.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10866) Add fetched metadata to ConsumerRecords

2020-12-18 Thread John Roesler (Jira)
John Roesler created KAFKA-10866:


 Summary: Add fetched metadata to ConsumerRecords
 Key: KAFKA-10866
 URL: https://issues.apache.org/jira/browse/KAFKA-10866
 Project: Kafka
  Issue Type: Sub-task
  Components: consumer
Reporter: John Roesler
Assignee: John Roesler
 Fix For: 2.8.0


Consumer-side changes for KIP-695



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah commented on a change in pull request #9713: KAFKA-10825 ZK ISR manager

2020-12-18 Thread GitBox


mumrah commented on a change in pull request #9713:
URL: https://github.com/apache/kafka/pull/9713#discussion_r545964382



##
File path: core/src/main/scala/kafka/cluster/Partition.scala
##
@@ -1374,47 +1314,28 @@ class Partition(val topicPartition: TopicPartition,
 }
   }
 
-  private def shrinkIsrWithZk(newIsr: Set[Int]): Unit = {
-val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
newIsr.toList, zkVersion)
-val zkVersionOpt = stateStore.shrinkIsr(controllerEpoch, newLeaderAndIsr)
-if (zkVersionOpt.isDefined) {
-  isrChangeListener.markShrink()
-}
-maybeUpdateIsrAndVersionWithZk(newIsr, zkVersionOpt)
-  }
-
-  private def maybeUpdateIsrAndVersionWithZk(isr: Set[Int], zkVersionOpt: 
Option[Int]): Unit = {
-zkVersionOpt match {
-  case Some(newVersion) =>
-isrState = CommittedIsr(isr)
-zkVersion = newVersion
-info("ISR updated to [%s] and zkVersion updated to 
[%d]".format(isr.mkString(","), zkVersion))
-
-  case None =>
-info(s"Cached zkVersion $zkVersion not equal to that in zookeeper, 
skip updating ISR")
-isrChangeListener.markFailed()
-}
-  }
-
   private def sendAlterIsrRequest(proposedIsrState: IsrState): Unit = {
 val isrToSend: Set[Int] = proposedIsrState match {
   case PendingExpandIsr(isr, newInSyncReplicaId) => isr + 
newInSyncReplicaId
   case PendingShrinkIsr(isr, outOfSyncReplicaIds) => isr -- 
outOfSyncReplicaIds
   case state =>
+isrChangeListener.markFailed()
 throw new IllegalStateException(s"Invalid state $state for `AlterIsr` 
request for partition $topicPartition")
 }
 
 val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
isrToSend.toList, zkVersion)
-val alterIsrItem = AlterIsrItem(topicPartition, newLeaderAndIsr, 
handleAlterIsrResponse(proposedIsrState))
+val alterIsrItem = AlterIsrItem(topicPartition, newLeaderAndIsr, 
handleAlterIsrResponse(proposedIsrState), controllerEpoch)
 
-if (!alterIsrManager.enqueue(alterIsrItem)) {
-  isrChangeListener.markFailed()
-  throw new IllegalStateException(s"Failed to enqueue `AlterIsr` request 
with state " +
-s"$newLeaderAndIsr for partition $topicPartition")
-}
-
-isrState = proposedIsrState
-debug(s"Sent `AlterIsr` request to change state to $newLeaderAndIsr after 
transition to $proposedIsrState")
+alterIsrManager.submit(alterIsrItem, (wasSubmitted: Boolean) => {
+  if (wasSubmitted) {
+isrState = proposedIsrState

Review comment:
   Yea, this seems simpler than the callback thing. I've made this change 
in the latest commit





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #9508: KAFKA-10648: Add Prefix Scan support to State Stores

2020-12-18 Thread GitBox


vamossagar12 commented on a change in pull request #9508:
URL: https://github.com/apache/kafka/pull/9508#discussion_r545958971



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java
##
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksIterator;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+class RocksDBPrefixIterator extends RocksDbIterator {

Review comment:
   Well the only reason i chose to add a separate iterator is that for 
prefix scan there is no end key which could be known upfront. We could pass a 
null end key but that's prohibited in RocksDBRangeIterator constructor and i 
don't think we should be changing that condition or we could pass in the same 
value or from and to and add a condition in makeNext() for prefix scan. i 
thought having a separate iterator might be cleaner. You have some other 
approach in mind?
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10865) Improve trace-logging for Transformations (including Predicates)

2020-12-18 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17251851#comment-17251851
 ] 

Randall Hauch commented on KAFKA-10865:
---

We currently trace-log the record passed into a transformation, but it looks 
like we don't trace-log the record just before it is passed to the connector.

We may want to have slightly different log messages for source and sink 
connectors. If so, that would make it difficult to add all of the log messages 
in `TransformationChain`. So we might also need to think about the trace-log 
messages in `WorkerSourceTask` and `WorkerSinkTask` near where the 
transformation chain is called.

We need to be careful about changing existing log messages, in case users are 
relying upon specific log messages for any automated tooling. Trace messages 
are probably a bit easier to change if necessary, since it's unlikely those 
would be enabled in production, but we should still strive for minimally 
changing existing log messages.

[~govi20], thank you for volunteering to take this ticket. You assigned 
yourself, so +1. Looking forward to a PR. :-)

> Improve trace-logging for Transformations (including Predicates)
> 
>
> Key: KAFKA-10865
> URL: https://issues.apache.org/jira/browse/KAFKA-10865
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Robin Moffatt
>Assignee: Govinda Sakhare
>Priority: Major
>  Labels: newbie
>
> I've been spending [a bunch of time poking around 
> SMTs|https://rmoff.net/categories/twelvedaysofsmt/] recently, and one common 
> challenge I've had is being able to debug when things don't behave as I 
> expect.
>   
>  I know that there is the {{TransformationChain}} logger, but this only gives 
> (IIUC) the input record
> {code:java}
> [2020-12-17 09:38:58,057] TRACE [sink-simulator-day12-00|task-0] Applying 
> transformation io.confluent.connect.transforms.Filter$Value to 
> SinkRecord{kafkaOffset=10551, timestampType=CreateTime} 
> ConnectRecord{topic='day12-sys01', kafkaPartition=0, 
> key=2c2ceb9b-8b31-4ade-a757-886ebfb7a398, keySchema=Schema{STRING}, 
> value=Struct{units=16,product=Founders Breakfast 
> Stout,amount=30.41,txn_date=Sat Dec 12 18:21:18 GMT 2020,source=SYS01}, 
> valueSchema=Schema{io.mdrogalis.Gen0:STRUCT}, timestamp=1608197938054, 
> headers=ConnectHeaders(headers=)} 
> (org.apache.kafka.connect.runtime.TransformationChain:47)
> {code}
>  
>  I think it would be really useful to also have trace level logging that 
> included:
>  - the _output_ of *each* transform
>  - the evaluation and result of any `predicate`s
> I have been using 
> {{com.github.jcustenborder.kafka.connect.simulator.SimulatorSinkConnector}} 
> which is really useful for seeing the final record:
> {code:java}
> [2020-12-17 09:38:58,057] INFO [sink-simulator-day12-00|task-0] 
> record.value=Struct{units=16,product=Founders Breakfast 
> Stout,amount=30.41,txn_date=Sat Dec 12 18:21:18 GMT 2020,source=SYS01} 
> (com.github.jcustenborder.kafka.connect.simulator.SimulatorSinkTask:50)
> {code}
>  
>  But doesn't include things like topic name (which is often changed by common 
> SMTs)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram commented on pull request #9485: KAKFA-10619: Idempotent producer will get authorized once it has a WRITE access to at least one topic

2020-12-18 Thread GitBox


rajinisivaram commented on pull request #9485:
URL: https://github.com/apache/kafka/pull/9485#issuecomment-748190567


   @d8tltanc Thanks for your patience with this PR, builds look good, merging 
to trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10100) LiveLeaders field in LeaderAndIsrRequest is not used anymore

2020-12-18 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10100?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-10100.
-
Resolution: Won't Fix

This is not worthwhile. 

> LiveLeaders field in LeaderAndIsrRequest is not used anymore
> 
>
> Key: KAFKA-10100
> URL: https://issues.apache.org/jira/browse/KAFKA-10100
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>
> We have noticed that the `LiveLeaders` field in the LeaderAndIsrRequest is 
> not used anywhere but still populated by the controller.
> It seems that that field was introduced in AK `0.8.0` and was supposed to be 
> removed in AK `0.8.1`: 
> [https://github.com/apache/kafka/blob/0.8.0/core/src/main/scala/kafka/cluster/Partition.scala#L194.]
> I think that we can safely deprecate the field and stop populating it for all 
> versions > 0.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on pull request #8801: KAFKA-10100; LiveLeaders field in LeaderAndIsrRequest is not used anymore

2020-12-18 Thread GitBox


dajac commented on pull request #8801:
URL: https://github.com/apache/kafka/pull/8801#issuecomment-748177193


   Won't proceed with this one.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac closed pull request #8801: KAFKA-10100; LiveLeaders field in LeaderAndIsrRequest is not used anymore

2020-12-18 Thread GitBox


dajac closed pull request #8801:
URL: https://github.com/apache/kafka/pull/8801


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10865) Improve trace-logging for Transformations (including Predicates)

2020-12-18 Thread Tom Bentley (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Bentley reassigned KAFKA-10865:
---

Assignee: Govinda Sakhare

> Improve trace-logging for Transformations (including Predicates)
> 
>
> Key: KAFKA-10865
> URL: https://issues.apache.org/jira/browse/KAFKA-10865
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Robin Moffatt
>Assignee: Govinda Sakhare
>Priority: Major
>  Labels: newbie
>
> I've been spending [a bunch of time poking around 
> SMTs|https://rmoff.net/categories/twelvedaysofsmt/] recently, and one common 
> challenge I've had is being able to debug when things don't behave as I 
> expect.
>   
>  I know that there is the {{TransformationChain}} logger, but this only gives 
> (IIUC) the input record
> {code:java}
> [2020-12-17 09:38:58,057] TRACE [sink-simulator-day12-00|task-0] Applying 
> transformation io.confluent.connect.transforms.Filter$Value to 
> SinkRecord{kafkaOffset=10551, timestampType=CreateTime} 
> ConnectRecord{topic='day12-sys01', kafkaPartition=0, 
> key=2c2ceb9b-8b31-4ade-a757-886ebfb7a398, keySchema=Schema{STRING}, 
> value=Struct{units=16,product=Founders Breakfast 
> Stout,amount=30.41,txn_date=Sat Dec 12 18:21:18 GMT 2020,source=SYS01}, 
> valueSchema=Schema{io.mdrogalis.Gen0:STRUCT}, timestamp=1608197938054, 
> headers=ConnectHeaders(headers=)} 
> (org.apache.kafka.connect.runtime.TransformationChain:47)
> {code}
>  
>  I think it would be really useful to also have trace level logging that 
> included:
>  - the _output_ of *each* transform
>  - the evaluation and result of any `predicate`s
> I have been using 
> {{com.github.jcustenborder.kafka.connect.simulator.SimulatorSinkConnector}} 
> which is really useful for seeing the final record:
> {code:java}
> [2020-12-17 09:38:58,057] INFO [sink-simulator-day12-00|task-0] 
> record.value=Struct{units=16,product=Founders Breakfast 
> Stout,amount=30.41,txn_date=Sat Dec 12 18:21:18 GMT 2020,source=SYS01} 
> (com.github.jcustenborder.kafka.connect.simulator.SimulatorSinkTask:50)
> {code}
>  
>  But doesn't include things like topic name (which is often changed by common 
> SMTs)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] APaMio commented on pull request #9750: MINOR: Change toArray usage for Increase efficiency

2020-12-18 Thread GitBox


APaMio commented on pull request #9750:
URL: https://github.com/apache/kafka/pull/9750#issuecomment-748161493


   Roger that, Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] APaMio removed a comment on pull request #9750: MINOR: Change toArray usage for Increase efficiency

2020-12-18 Thread GitBox


APaMio removed a comment on pull request #9750:
URL: https://github.com/apache/kafka/pull/9750#issuecomment-748159574


   Roger ,Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] APaMio commented on pull request #9750: MINOR: Change toArray usage for Increase efficiency

2020-12-18 Thread GitBox


APaMio commented on pull request #9750:
URL: https://github.com/apache/kafka/pull/9750#issuecomment-748159574


   Roger ,Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10865) Improve trace-logging for Transformations (including Predicates)

2020-12-18 Thread Govinda Sakhare (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17251805#comment-17251805
 ] 

Govinda Sakhare commented on KAFKA-10865:
-

Should I work on this?

> Improve trace-logging for Transformations (including Predicates)
> 
>
> Key: KAFKA-10865
> URL: https://issues.apache.org/jira/browse/KAFKA-10865
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Robin Moffatt
>Priority: Major
>  Labels: newbie
>
> I've been spending [a bunch of time poking around 
> SMTs|https://rmoff.net/categories/twelvedaysofsmt/] recently, and one common 
> challenge I've had is being able to debug when things don't behave as I 
> expect.
>   
>  I know that there is the {{TransformationChain}} logger, but this only gives 
> (IIUC) the input record
> {code:java}
> [2020-12-17 09:38:58,057] TRACE [sink-simulator-day12-00|task-0] Applying 
> transformation io.confluent.connect.transforms.Filter$Value to 
> SinkRecord{kafkaOffset=10551, timestampType=CreateTime} 
> ConnectRecord{topic='day12-sys01', kafkaPartition=0, 
> key=2c2ceb9b-8b31-4ade-a757-886ebfb7a398, keySchema=Schema{STRING}, 
> value=Struct{units=16,product=Founders Breakfast 
> Stout,amount=30.41,txn_date=Sat Dec 12 18:21:18 GMT 2020,source=SYS01}, 
> valueSchema=Schema{io.mdrogalis.Gen0:STRUCT}, timestamp=1608197938054, 
> headers=ConnectHeaders(headers=)} 
> (org.apache.kafka.connect.runtime.TransformationChain:47)
> {code}
>  
>  I think it would be really useful to also have trace level logging that 
> included:
>  - the _output_ of *each* transform
>  - the evaluation and result of any `predicate`s
> I have been using 
> {{com.github.jcustenborder.kafka.connect.simulator.SimulatorSinkConnector}} 
> which is really useful for seeing the final record:
> {code:java}
> [2020-12-17 09:38:58,057] INFO [sink-simulator-day12-00|task-0] 
> record.value=Struct{units=16,product=Founders Breakfast 
> Stout,amount=30.41,txn_date=Sat Dec 12 18:21:18 GMT 2020,source=SYS01} 
> (com.github.jcustenborder.kafka.connect.simulator.SimulatorSinkTask:50)
> {code}
>  
>  But doesn't include things like topic name (which is often changed by common 
> SMTs)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10865) Improve trace-logging for Transformations (including Predicates)

2020-12-18 Thread Govinda Sakhare (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17251805#comment-17251805
 ] 

Govinda Sakhare edited comment on KAFKA-10865 at 12/18/20, 3:04 PM:


would it fine if I take this ticket?


was (Author: govi20):
Should I work on this?

> Improve trace-logging for Transformations (including Predicates)
> 
>
> Key: KAFKA-10865
> URL: https://issues.apache.org/jira/browse/KAFKA-10865
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Robin Moffatt
>Priority: Major
>  Labels: newbie
>
> I've been spending [a bunch of time poking around 
> SMTs|https://rmoff.net/categories/twelvedaysofsmt/] recently, and one common 
> challenge I've had is being able to debug when things don't behave as I 
> expect.
>   
>  I know that there is the {{TransformationChain}} logger, but this only gives 
> (IIUC) the input record
> {code:java}
> [2020-12-17 09:38:58,057] TRACE [sink-simulator-day12-00|task-0] Applying 
> transformation io.confluent.connect.transforms.Filter$Value to 
> SinkRecord{kafkaOffset=10551, timestampType=CreateTime} 
> ConnectRecord{topic='day12-sys01', kafkaPartition=0, 
> key=2c2ceb9b-8b31-4ade-a757-886ebfb7a398, keySchema=Schema{STRING}, 
> value=Struct{units=16,product=Founders Breakfast 
> Stout,amount=30.41,txn_date=Sat Dec 12 18:21:18 GMT 2020,source=SYS01}, 
> valueSchema=Schema{io.mdrogalis.Gen0:STRUCT}, timestamp=1608197938054, 
> headers=ConnectHeaders(headers=)} 
> (org.apache.kafka.connect.runtime.TransformationChain:47)
> {code}
>  
>  I think it would be really useful to also have trace level logging that 
> included:
>  - the _output_ of *each* transform
>  - the evaluation and result of any `predicate`s
> I have been using 
> {{com.github.jcustenborder.kafka.connect.simulator.SimulatorSinkConnector}} 
> which is really useful for seeing the final record:
> {code:java}
> [2020-12-17 09:38:58,057] INFO [sink-simulator-day12-00|task-0] 
> record.value=Struct{units=16,product=Founders Breakfast 
> Stout,amount=30.41,txn_date=Sat Dec 12 18:21:18 GMT 2020,source=SYS01} 
> (com.github.jcustenborder.kafka.connect.simulator.SimulatorSinkTask:50)
> {code}
>  
>  But doesn't include things like topic name (which is often changed by common 
> SMTs)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10865) Improve trace-logging for Transformations (including Predicates)

2020-12-18 Thread Govinda Sakhare (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10865?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17251805#comment-17251805
 ] 

Govinda Sakhare edited comment on KAFKA-10865 at 12/18/20, 3:04 PM:


would it be fine if I take this ticket?


was (Author: govi20):
would it fine if I take this ticket?

> Improve trace-logging for Transformations (including Predicates)
> 
>
> Key: KAFKA-10865
> URL: https://issues.apache.org/jira/browse/KAFKA-10865
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Robin Moffatt
>Priority: Major
>  Labels: newbie
>
> I've been spending [a bunch of time poking around 
> SMTs|https://rmoff.net/categories/twelvedaysofsmt/] recently, and one common 
> challenge I've had is being able to debug when things don't behave as I 
> expect.
>   
>  I know that there is the {{TransformationChain}} logger, but this only gives 
> (IIUC) the input record
> {code:java}
> [2020-12-17 09:38:58,057] TRACE [sink-simulator-day12-00|task-0] Applying 
> transformation io.confluent.connect.transforms.Filter$Value to 
> SinkRecord{kafkaOffset=10551, timestampType=CreateTime} 
> ConnectRecord{topic='day12-sys01', kafkaPartition=0, 
> key=2c2ceb9b-8b31-4ade-a757-886ebfb7a398, keySchema=Schema{STRING}, 
> value=Struct{units=16,product=Founders Breakfast 
> Stout,amount=30.41,txn_date=Sat Dec 12 18:21:18 GMT 2020,source=SYS01}, 
> valueSchema=Schema{io.mdrogalis.Gen0:STRUCT}, timestamp=1608197938054, 
> headers=ConnectHeaders(headers=)} 
> (org.apache.kafka.connect.runtime.TransformationChain:47)
> {code}
>  
>  I think it would be really useful to also have trace level logging that 
> included:
>  - the _output_ of *each* transform
>  - the evaluation and result of any `predicate`s
> I have been using 
> {{com.github.jcustenborder.kafka.connect.simulator.SimulatorSinkConnector}} 
> which is really useful for seeing the final record:
> {code:java}
> [2020-12-17 09:38:58,057] INFO [sink-simulator-day12-00|task-0] 
> record.value=Struct{units=16,product=Founders Breakfast 
> Stout,amount=30.41,txn_date=Sat Dec 12 18:21:18 GMT 2020,source=SYS01} 
> (com.github.jcustenborder.kafka.connect.simulator.SimulatorSinkTask:50)
> {code}
>  
>  But doesn't include things like topic name (which is often changed by common 
> SMTs)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10865) Improve trace-logging for Transformations (including Predicates)

2020-12-18 Thread Robin Moffatt (Jira)
Robin Moffatt created KAFKA-10865:
-

 Summary: Improve trace-logging for Transformations (including 
Predicates)
 Key: KAFKA-10865
 URL: https://issues.apache.org/jira/browse/KAFKA-10865
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Robin Moffatt


I've been spending [a bunch of time poking around 
SMTs|https://rmoff.net/categories/twelvedaysofsmt/] recently, and one common 
challenge I've had is being able to debug when things don't behave as I expect.
  
 I know that there is the {{TransformationChain}} logger, but this only gives 
(IIUC) the input record
{code:java}
[2020-12-17 09:38:58,057] TRACE [sink-simulator-day12-00|task-0] Applying 
transformation io.confluent.connect.transforms.Filter$Value to 
SinkRecord{kafkaOffset=10551, timestampType=CreateTime} 
ConnectRecord{topic='day12-sys01', kafkaPartition=0, 
key=2c2ceb9b-8b31-4ade-a757-886ebfb7a398, keySchema=Schema{STRING}, 
value=Struct{units=16,product=Founders Breakfast 
Stout,amount=30.41,txn_date=Sat Dec 12 18:21:18 GMT 2020,source=SYS01}, 
valueSchema=Schema{io.mdrogalis.Gen0:STRUCT}, timestamp=1608197938054, 
headers=ConnectHeaders(headers=)} 
(org.apache.kafka.connect.runtime.TransformationChain:47)
{code}
 
 I think it would be really useful to also have trace level logging that 
included:
 - the _output_ of *each* transform
 - the evaluation and result of any `predicate`s


I have been using 
{{com.github.jcustenborder.kafka.connect.simulator.SimulatorSinkConnector}} 
which is really useful for seeing the final record:
{code:java}
[2020-12-17 09:38:58,057] INFO [sink-simulator-day12-00|task-0] 
record.value=Struct{units=16,product=Founders Breakfast 
Stout,amount=30.41,txn_date=Sat Dec 12 18:21:18 GMT 2020,source=SYS01} 
(com.github.jcustenborder.kafka.connect.simulator.SimulatorSinkTask:50)
{code}
 
 But doesn't include things like topic name (which is often changed by common 
SMTs)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] bertber edited a comment on pull request #9761: KAFKA-10768 Add a test for ByteBufferInputStream to ByteBufferLogInputStreamTest

2020-12-18 Thread GitBox


bertber edited a comment on pull request #9761:
URL: https://github.com/apache/kafka/pull/9761#issuecomment-748028169


   > Is 
`kafka.server.ControllerMutationQuotaTest.testPermissiveDeleteTopicsRequest` 
related to this PR?
   
   No, I don't think it is related to this PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bertber commented on pull request #9761: KAFKA-10768 Add a test for ByteBufferInputStream to ByteBufferLogInputStreamTest

2020-12-18 Thread GitBox


bertber commented on pull request #9761:
URL: https://github.com/apache/kafka/pull/9761#issuecomment-748028169


   > Is 
`kafka.server.ControllerMutationQuotaTest.testPermissiveDeleteTopicsRequest` 
related to this PR?
   
   Yes, I think it is related to this PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10843) Kafka Streams metadataForKey method returns null but allMetadata has the details

2020-12-18 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17251690#comment-17251690
 ] 

Bruno Cadonna edited comment on KAFKA-10843 at 12/18/20, 11:02 AM:
---

I had another look at the code and I see two options {{metadataForKey()}} might 
return {{null}}.

1. There are no source topics in the topology for the given state store (see 
[code|https://github.com/apache/kafka/blob/0efa8fb0f4c73d92b6e55a112fa45417a67a7dc2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java#L165]).
 2. Kafka Streams finds the wrong source topic partition for the given key (see 
[code|https://github.com/apache/kafka/blob/0efa8fb0f4c73d92b6e55a112fa45417a67a7dc2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java#L411]).

I guess, option 1 does not apply since the same topology worked in your other 
environments.

Is your data partitioned differently in your PROD environment than in your 
other environments? Do you use a custom partitioner for the data in your PROD 
environment?


was (Author: cadonna):
I had another look at the code and I see two options {{metadataForKey()}} might 
return {{null}}.

1. There are no source topics in the topology for the given state store (see 
[code|https://github.com/apache/kafka/blob/0efa8fb0f4c73d92b6e55a112fa45417a67a7dc2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java#L165]).
 2. Kafka Streams finds the wrong source topic partition for the given key (see 
[code|https://github.com/apache/kafka/blob/0efa8fb0f4c73d92b6e55a112fa45417a67a7dc2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java#L411]).

I guess, option 1 does not apply since the same topology worked in your other 
environments.

Is your data partitioned differently in your PROD environment than in your 
other environments?

> Kafka Streams metadataForKey method returns null but allMetadata has the 
> details
> 
>
> Key: KAFKA-10843
> URL: https://issues.apache.org/jira/browse/KAFKA-10843
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.1
>Reporter: Maria Thomas
>Priority: Major
>
> Our application runs on multiple instances and to enable us to use get the 
> key information from the state store we use "metadataForKey" method to 
> retrieve the StreamMetadata and using the hostname do an RPC call to the host 
> to get the value associated with the key.
> This call was working fine in our DEV and TEST environments, however, it is 
> failing one our production clusters from the start. On further debugging, I 
> noticed the allMetadata() method was returning the state stores with the host 
> details as expected. However, it would not be feasible to go through each 
> store explicitly to get the key details.
> To note, the cluster I am using is a stretch cluster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10843) Kafka Streams metadataForKey method returns null but allMetadata has the details

2020-12-18 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17251690#comment-17251690
 ] 

Bruno Cadonna commented on KAFKA-10843:
---

I had another look at the code and I see two options {{metadataForKey()}} might 
return {{null}}.

1. There are no source topics in the topology for the given state store (see 
[code|https://github.com/apache/kafka/blob/0efa8fb0f4c73d92b6e55a112fa45417a67a7dc2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java#L165]).
 2. Kafka Streams finds the wrong source topic partition for the given key (see 
[code|https://github.com/apache/kafka/blob/0efa8fb0f4c73d92b6e55a112fa45417a67a7dc2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java#L411]).

I guess, option 1 does not apply since the same topology worked in your other 
environments.

Is your data partitioned differently in your PROD environment than in your 
other environments?

> Kafka Streams metadataForKey method returns null but allMetadata has the 
> details
> 
>
> Key: KAFKA-10843
> URL: https://issues.apache.org/jira/browse/KAFKA-10843
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.1
>Reporter: Maria Thomas
>Priority: Major
>
> Our application runs on multiple instances and to enable us to use get the 
> key information from the state store we use "metadataForKey" method to 
> retrieve the StreamMetadata and using the hostname do an RPC call to the host 
> to get the value associated with the key.
> This call was working fine in our DEV and TEST environments, however, it is 
> failing one our production clusters from the start. On further debugging, I 
> noticed the allMetadata() method was returning the state stores with the host 
> details as expected. However, it would not be feasible to go through each 
> store explicitly to get the key details.
> To note, the cluster I am using is a stretch cluster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9837) New RPC for notifying controller of failed replica

2020-12-18 Thread dengziming (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17251619#comment-17251619
 ] 

dengziming commented on KAFKA-9837:
---

[GitHub Pull Request #9577|https://github.com/apache/kafka/pull/9577] is ready 
for review!

> New RPC for notifying controller of failed replica
> --
>
> Key: KAFKA-9837
> URL: https://issues.apache.org/jira/browse/KAFKA-9837
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller, core
>Reporter: David Arthur
>Assignee: dengziming
>Priority: Major
> Fix For: 2.8.0
>
>
> This is the tracking ticket for 
> [KIP-589|https://cwiki.apache.org/confluence/display/KAFKA/KIP-589+Add+API+to+update+Replica+state+in+Controller].
>  For the bridge release, brokers should no longer use ZooKeeper to notify the 
> controller that a log dir has failed. It should instead use an RPC mechanism.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10843) Kafka Streams metadataForKey method returns null but allMetadata has the details

2020-12-18 Thread Maria Thomas (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10843?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17251609#comment-17251609
 ] 

Maria Thomas commented on KAFKA-10843:
--

[~cadonna] The stretch cluster is a multiple data center deployment so, our 
topics are not on different clusters but replicated across different 
datacenters. So, I didn't get the second question. We have 6 bootstrap servers 
specified, 3 in each datacenter.


[~mjsax] You want me to update the Kafka-streams version to 2.6.0? With regards 
to using allMetadata(), this wont still give me the particular store where my 
key is right? It would give all the hosts where the store is present, but my 
key could be on any of them. 

> Kafka Streams metadataForKey method returns null but allMetadata has the 
> details
> 
>
> Key: KAFKA-10843
> URL: https://issues.apache.org/jira/browse/KAFKA-10843
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.1
>Reporter: Maria Thomas
>Priority: Major
>
> Our application runs on multiple instances and to enable us to use get the 
> key information from the state store we use "metadataForKey" method to 
> retrieve the StreamMetadata and using the hostname do an RPC call to the host 
> to get the value associated with the key.
> This call was working fine in our DEV and TEST environments, however, it is 
> failing one our production clusters from the start. On further debugging, I 
> noticed the allMetadata() method was returning the state stores with the host 
> details as expected. However, it would not be feasible to go through each 
> store explicitly to get the key details.
> To note, the cluster I am using is a stretch cluster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] tombentley commented on pull request #9735: KAFKA-10846: Grow buffer in FileSourceStreamTask only when needed

2020-12-18 Thread GitBox


tombentley commented on pull request #9735:
URL: https://github.com/apache/kafka/pull/9735#issuecomment-747954608


   @chia7712 thank _you_ for the reviews.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Montyleo commented on pull request #9675: KAFKA-10794 Replica leader election is too slow in the case of too many partitions

2020-12-18 Thread GitBox


Montyleo commented on pull request #9675:
URL: https://github.com/apache/kafka/pull/9675#issuecomment-747942928


   > @chia7712 does the patch can resolve the issue ? I find the only 
differences is that controllerContext.allPartitions can be invoked once or the 
number of partition times . please correct me if I am wrong. thanks.
   
   Hi,lqjack
   
   Thanks for your question.
   There is a saying that: quantitative change leads to qualitative change.
   when  the function controllerContext.allPartitions was called too many time, 
the rebalance will become too slow.
   I'll show you the effect after the PR published,1.3ms VS 35541ms
   
   
![clipboard_image_1608279734070](https://user-images.githubusercontent.com/8037560/102591724-723fc080-414d-11eb-84ed-c3e1ca704f8c.png)
   
   
![image](https://user-images.githubusercontent.com/8037560/102591822-9ac7ba80-414d-11eb-9a15-e01b6b261892.png)
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9761: KAFKA-10768 Add a test for ByteBufferInputStream to ByteBufferLogInputStreamTest

2020-12-18 Thread GitBox


chia7712 commented on pull request #9761:
URL: https://github.com/apache/kafka/pull/9761#issuecomment-747942346


   Is 
```kafka.server.ControllerMutationQuotaTest.testPermissiveDeleteTopicsRequest```
 related to this PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org