[GitHub] [kafka] vinothchandar commented on issue #8462: KAFKA-9846: Filter active tasks for running state in KafkaStreams#allLocalStorePartitionLags()

2020-04-21 Thread GitBox


vinothchandar commented on issue #8462:
URL: https://github.com/apache/kafka/pull/8462#issuecomment-617567564


   @guozhangwang So, I still need to add a test case around this specific 
scenario, tasks stuck in created state.. Seems it needs some engineering to 
create that scenario. (sophie gave me some pointers, yet to try them).. 
   
   if we need to just fix the test issue, I can open a simpler one with just 
the test fix.. That's probably better?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vinothchandar commented on a change in pull request #8462: KAFKA-9846: Filter active tasks for running state in KafkaStreams#allLocalStorePartitionLags()

2020-04-21 Thread GitBox


vinothchandar commented on a change in pull request #8462:
URL: https://github.com/apache/kafka/pull/8462#discussion_r412690139



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
##
@@ -295,9 +295,10 @@ public void onRestoreEnd(final TopicPartition 
topicPartition, final String store
 }
 });
 
-restartedStreams.start();
+// Wait till the restarted instance reaches running, after 
restoring
+
startApplicationAndWaitUntilRunning(Collections.singletonList(restartedStreams),
 Duration.ofSeconds(60));

Review comment:
   this is the actual fix for flaky test .. 
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-9901) TimeoutError: Never saw message indicating StreamsTest finished startup on ducker@ducker07

2020-04-21 Thread jiamei xie (Jira)
jiamei xie created KAFKA-9901:
-

 Summary: TimeoutError: Never saw message indicating StreamsTest 
finished startup on ducker@ducker07
 Key: KAFKA-9901
 URL: https://issues.apache.org/jira/browse/KAFKA-9901
 Project: Kafka
  Issue Type: Bug
  Components: streams, system tests
Reporter: jiamei xie
Assignee: jiamei xie


When running  _DUCKTAPE_OPTIONS="--debug" 
TC_PATHS="tests/kafkatest/tests/streams/streams_broker_bounce_test.py::StreamsBrokerBounceTest.test_all_brokers_bounce"
 bash tests/docker/run_tests.sh | tee debug_logs.txt
It failed because of below error.
TimeoutError: Never saw message indicating StreamsTest finished startup on 
ducker@ducker07

https://github.com/apache/kafka/pull/8443 updated the constructor of 
StreamsSmokeTestJobRunnerService.  But it wasn't updated in 
streams_broker_bounce_test



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-21 Thread GitBox


ijuma commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r412680581



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,19 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)

Review comment:
   Do we need this? Can we not pass the `time` from every caller?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc opened a new pull request #8528: Changed the system tests for --zookeeper flag removal

2020-04-21 Thread GitBox


d8tltanc opened a new pull request #8528:
URL: https://github.com/apache/kafka/pull/8528


   * Remove the --zookeeper flags for node versions supporting 
--bootstrap-server for TopicCommand.
   For the scram credential related code piece, switched to using 
--bootstrap-server even we don't support it yet.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ConcurrencyPractitioner commented on issue #7884: [KAFKA-8522] Streamline tombstone and transaction marker removal

2020-04-21 Thread GitBox


ConcurrencyPractitioner commented on issue #7884:
URL: https://github.com/apache/kafka/pull/7884#issuecomment-617537261


   test this please
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ConcurrencyPractitioner commented on issue #7884: [KAFKA-8522] Streamline tombstone and transaction marker removal

2020-04-21 Thread GitBox


ConcurrencyPractitioner commented on issue #7884:
URL: https://github.com/apache/kafka/pull/7884#issuecomment-617537213


   ok to test
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] senthilm-ms commented on issue #8103: KAFKA-7061: KIP-280 Enhanced log compaction

2020-04-21 Thread GitBox


senthilm-ms commented on issue #8103:
URL: https://github.com/apache/kafka/pull/8103#issuecomment-617535197


   @junrao @guozhangwang can you please review and let me know if you have any 
more comments. would like to close and move on.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9900) Expired Sensor can not be remove,and it fills all the old generation,can't be GC.

2020-04-21 Thread Chenxin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chenxin updated KAFKA-9900:
---
Attachment: leak3.jpg

> Expired Sensor can not be remove,and it fills all the old generation,can't be 
> GC.
> -
>
> Key: KAFKA-9900
> URL: https://issues.apache.org/jira/browse/KAFKA-9900
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0, 2.1.1
>Reporter: Chenxin
>Priority: Critical
> Attachments: Leak1.jpg, leak2.jpg, leak3.jpg
>
>
> I tried kafka-client 2.1.0 and 2.1.1, but it exists both.
> I found that ,lots of Sensor and Metrics in memory.Cause it's too big, it's 
> in old Gen at last,and can not be GC.
> I want to disable the Metrics Collecotor, but I didn't find a way. Is this a 
> problem? Or just my wrong use case?
>  
> I'am a Chinesse, not very good at english.
>  
> Thanks.
>  
> !leak2.jpg|width=612,height=227!
> !Leak1.jpg|width=354,height=286!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9900) Expired Sensor can not be remove,and it fills all the old generation,can't be GC.

2020-04-21 Thread Chenxin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chenxin updated KAFKA-9900:
---
Description: 
I tried kafka-client 2.1.0 and 2.1.1, but it exists both.

I found that ,lots of Sensor and Metrics in memory.Cause it's too big, it's in 
old Gen at last,and can not be GC.

I want to disable the Metrics Collecotor, but I didn't find a way. Is this a 
problem? Or just my wrong use case?

 

I'am a Chinesse, not very good at english.

 

Thanks.

 

!leak2.jpg|width=612,height=227!

!Leak1.jpg|width=354,height=286!

 

There is another picture by jvisualvm 。

!leak3.jpg|width=1998,height=85!

  was:
I tried kafka-client 2.1.0 and 2.1.1, but it exists both.

I found that ,lots of Sensor and Metrics in memory.Cause it's too big, it's in 
old Gen at last,and can not be GC.

I want to disable the Metrics Collecotor, but I didn't find a way. Is this a 
problem? Or just my wrong use case?

 

I'am a Chinesse, not very good at english.

 

Thanks.

 

!leak2.jpg|width=612,height=227!

!Leak1.jpg|width=354,height=286!


> Expired Sensor can not be remove,and it fills all the old generation,can't be 
> GC.
> -
>
> Key: KAFKA-9900
> URL: https://issues.apache.org/jira/browse/KAFKA-9900
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0, 2.1.1
>Reporter: Chenxin
>Priority: Critical
> Attachments: Leak1.jpg, leak2.jpg, leak3.jpg
>
>
> I tried kafka-client 2.1.0 and 2.1.1, but it exists both.
> I found that ,lots of Sensor and Metrics in memory.Cause it's too big, it's 
> in old Gen at last,and can not be GC.
> I want to disable the Metrics Collecotor, but I didn't find a way. Is this a 
> problem? Or just my wrong use case?
>  
> I'am a Chinesse, not very good at english.
>  
> Thanks.
>  
> !leak2.jpg|width=612,height=227!
> !Leak1.jpg|width=354,height=286!
>  
> There is another picture by jvisualvm 。
> !leak3.jpg|width=1998,height=85!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] d8tltanc opened a new pull request #8527: Splitted unit tests for --zookeeper flag removal

2020-04-21 Thread GitBox


d8tltanc opened a new pull request #8527:
URL: https://github.com/apache/kafka/pull/8527


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on issue #8482: KAFKA-9863: update the deprecated --zookeeper option in the documentation into --bootstrap-server

2020-04-21 Thread GitBox


showuon commented on issue #8482:
URL: https://github.com/apache/kafka/pull/8482#issuecomment-617529478


   @junrao @omkreddy , could you please help review this PR? This is a small 
and straightforward change to update the documentation only, but I think it's 
important to users. Thank you very much.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] apovzner commented on issue #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch

2020-04-21 Thread GitBox


apovzner commented on issue #8509:
URL: https://github.com/apache/kafka/pull/8509#issuecomment-617525619


   @dajac Sounds good! I added unit tests to KafkaApisTest as you suggested.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9900) Expired Sensor can not be remove,and it fills all the old generation,can't be GC.

2020-04-21 Thread Chenxin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chenxin updated KAFKA-9900:
---
Description: 
I tried kafka-client 2.1.0 and 2.1.1, but it exists both.

I found that ,lots of Sensor and Metrics in memory.Cause it's too big, it's in 
old Gen at last,and can not be GC.

I want to disable the Metrics Collecotor, but I didn't find a way. Is this a 
problem? Or just my wrong use case?

 

I'am a Chinesse, not very good at english.

 

Thanks.

 

!leak2.jpg|width=612,height=227!

!Leak1.jpg|width=354,height=286!

  was:
 

!leak2.jpg|width=612,height=227!

!Leak1.jpg|width=354,height=286!


> Expired Sensor can not be remove,and it fills all the old generation,can't be 
> GC.
> -
>
> Key: KAFKA-9900
> URL: https://issues.apache.org/jira/browse/KAFKA-9900
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0, 2.1.1
>Reporter: Chenxin
>Priority: Critical
> Attachments: Leak1.jpg, leak2.jpg
>
>
> I tried kafka-client 2.1.0 and 2.1.1, but it exists both.
> I found that ,lots of Sensor and Metrics in memory.Cause it's too big, it's 
> in old Gen at last,and can not be GC.
> I want to disable the Metrics Collecotor, but I didn't find a way. Is this a 
> problem? Or just my wrong use case?
>  
> I'am a Chinesse, not very good at english.
>  
> Thanks.
>  
> !leak2.jpg|width=612,height=227!
> !Leak1.jpg|width=354,height=286!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9900) Expired Sensor can not be remove,and it fills all the old generation,can't be GC.

2020-04-21 Thread Chenxin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chenxin updated KAFKA-9900:
---
Attachment: leak2.jpg

> Expired Sensor can not be remove,and it fills all the old generation,can't be 
> GC.
> -
>
> Key: KAFKA-9900
> URL: https://issues.apache.org/jira/browse/KAFKA-9900
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0, 2.1.1
>Reporter: Chenxin
>Priority: Critical
> Attachments: Leak1.jpg, leak2.jpg
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9900) Expired Sensor can not be remove,and it fills all the old generation,can't be GC.

2020-04-21 Thread Chenxin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chenxin updated KAFKA-9900:
---
Description: 
 

!leak2.jpg|width=612,height=227!

!Leak1.jpg|width=354,height=286!

> Expired Sensor can not be remove,and it fills all the old generation,can't be 
> GC.
> -
>
> Key: KAFKA-9900
> URL: https://issues.apache.org/jira/browse/KAFKA-9900
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0, 2.1.1
>Reporter: Chenxin
>Priority: Critical
> Attachments: Leak1.jpg, leak2.jpg
>
>
>  
> !leak2.jpg|width=612,height=227!
> !Leak1.jpg|width=354,height=286!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9900) Expired Sensor can not be remove,and it fills all the old generation,can't be GC.

2020-04-21 Thread Chenxin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chenxin updated KAFKA-9900:
---
Attachment: Leak1.jpg

> Expired Sensor can not be remove,and it fills all the old generation,can't be 
> GC.
> -
>
> Key: KAFKA-9900
> URL: https://issues.apache.org/jira/browse/KAFKA-9900
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0, 2.1.1
>Reporter: Chenxin
>Priority: Critical
> Attachments: Leak1.jpg, leak2.jpg
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9900) Expired Sensor can not be remove,and it fills all the old generation,can't be GC.

2020-04-21 Thread Chenxin (Jira)
Chenxin created KAFKA-9900:
--

 Summary: Expired Sensor can not be remove,and it fills all the old 
generation,can't be GC.
 Key: KAFKA-9900
 URL: https://issues.apache.org/jira/browse/KAFKA-9900
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.1.1, 2.1.0
Reporter: Chenxin
 Attachments: Leak1.jpg





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable

2020-04-21 Thread GitBox


abbccdda commented on a change in pull request #8486:
URL: https://github.com/apache/kafka/pull/8486#discussion_r412633489



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
##
@@ -547,17 +547,15 @@ public void testMaybeCompleteValidationAfterOffsetReset() 
{
 int initialOffsetEpoch = 5;
 
 SubscriptionState.FetchPosition initialPosition = new 
SubscriptionState.FetchPosition(initialOffset,
-Optional.of(initialOffsetEpoch), new 
Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch)));
+Optional.of(initialOffsetEpoch), new 
Metadata.LeaderAndEpoch(Optional.of(broker1), Optional.of(currentEpoch)));

Review comment:
   Only side cleanups





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable

2020-04-21 Thread GitBox


abbccdda commented on a change in pull request #8486:
URL: https://github.com/apache/kafka/pull/8486#discussion_r412633349



##
File path: clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
##
@@ -199,19 +200,21 @@ public void 
testIgnoreLeaderEpochInOlderMetadataResponse() {
 MetadataResponse response = new MetadataResponse(struct, version);
 assertFalse(response.hasReliableLeaderEpochs());
 metadata.updateWithCurrentRequestVersion(response, false, 100);
+assertFalse(metadata.hasReliableLeaderEpochs());

Review comment:
   Added test coverage L203 and L214, other changes are just side cleanups 
and signature refactoring in this file.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable

2020-04-21 Thread GitBox


abbccdda commented on a change in pull request #8486:
URL: https://github.com/apache/kafka/pull/8486#discussion_r412632269



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
##
@@ -85,10 +85,6 @@ protected OffsetForEpochResult handleResponse(
 case KAFKA_STORAGE_ERROR:
 case OFFSET_NOT_AVAILABLE:
 case LEADER_NOT_AVAILABLE:
-logger().debug("Attempt to fetch offsets for partition {} 
failed due to {}, retrying.",

Review comment:
   Side cleanup





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable

2020-04-21 Thread GitBox


abbccdda commented on a change in pull request #8486:
URL: https://github.com/apache/kafka/pull/8486#discussion_r412632493



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
##
@@ -464,22 +466,41 @@ public static MetadataResponse prepareResponse(int 
throttleTimeMs, Collection

[GitHub] [kafka] abbccdda commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable

2020-04-21 Thread GitBox


abbccdda commented on a change in pull request #8486:
URL: https://github.com/apache/kafka/pull/8486#discussion_r412632092



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -498,6 +498,104 @@ public void validateOffsetsIfNeeded() {
 validateOffsetsAsync(partitionsToValidate);
 }
 
+/**
+ * For each partition which needs validation, make an asynchronous request 
to get the end-offsets for the partition
+ * with the epoch less than or equal to the epoch the partition last saw.
+ *
+ * Requests are grouped by Node for efficiency.
+ */
+private void validateOffsetsAsync(Map partitionsToValidate) {
+final Map> 
regrouped =
+regroupFetchPositionsByLeader(partitionsToValidate);
+
+regrouped.forEach((node, fetchPositions) -> {
+if (node.isEmpty()) {
+metadata.requestUpdate();
+return;
+}
+
+NodeApiVersions nodeApiVersions = apiVersions.get(node.idString());
+if (nodeApiVersions == null) {
+client.tryConnect(node);
+return;
+}
+
+if (!hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
+log.debug("Skipping validation of fetch offsets for partitions 
{} since the broker does not " +
+  "support the required protocol version 
(introduced in Kafka 2.3)",
+fetchPositions.keySet());
+completeAllValidations(fetchPositions);
+return;
+}
+
+// We need to get the client epoch state before sending out the 
leader epoch request, and use it to
+// decide whether we need to validate offsets.
+if (!metadata.hasReliableLeaderEpochs()) {
+log.debug("Skipping validation of fetch offsets for partitions 
{} since the provided leader broker " +
+  "is not reliable", fetchPositions.keySet());
+completeAllValidations(fetchPositions);
+return;
+}
+
+subscriptions.setNextAllowedRetry(fetchPositions.keySet(), 
time.milliseconds() + requestTimeoutMs);
+
+RequestFuture 
future =
+offsetsForLeaderEpochClient.sendAsyncRequest(node, 
fetchPositions);
+
+future.addListener(new 
RequestFutureListener() {
+@Override
+public void 
onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsResult) {
+Map 
truncationWithoutResetPolicy = new HashMap<>();
+if (!offsetsResult.partitionsToRetry().isEmpty()) {
+
subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(), 
time.milliseconds() + retryBackoffMs);
+metadata.requestUpdate();
+}
+
+// For each OffsetsForLeader response, check if the 
end-offset is lower than our current offset
+// for the partition. If so, it means we have experienced 
log truncation and need to reposition
+// that partition's offset.
+//
+// In addition, check whether the returned offset and 
epoch are valid. If not, then we should treat
+// it as out of range and update metadata for rediscovery.
+offsetsResult.endOffsets().forEach((respTopicPartition, 
respEndOffset) -> {
+if (respEndOffset.hasUndefinedEpochOrOffset()) {

Review comment:
   Change two: do not complete the validation as the returned epoch or 
offset is invalid. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable

2020-04-21 Thread GitBox


abbccdda commented on a change in pull request #8486:
URL: https://github.com/apache/kafka/pull/8486#discussion_r412631698



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -498,6 +498,104 @@ public void validateOffsetsIfNeeded() {
 validateOffsetsAsync(partitionsToValidate);
 }
 
+/**
+ * For each partition which needs validation, make an asynchronous request 
to get the end-offsets for the partition
+ * with the epoch less than or equal to the epoch the partition last saw.
+ *
+ * Requests are grouped by Node for efficiency.
+ */
+private void validateOffsetsAsync(Map partitionsToValidate) {

Review comment:
   Move the function closer to its caller.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable

2020-04-21 Thread GitBox


abbccdda commented on a change in pull request #8486:
URL: https://github.com/apache/kafka/pull/8486#discussion_r412631879



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -498,6 +498,104 @@ public void validateOffsetsIfNeeded() {
 validateOffsetsAsync(partitionsToValidate);
 }
 
+/**
+ * For each partition which needs validation, make an asynchronous request 
to get the end-offsets for the partition
+ * with the epoch less than or equal to the epoch the partition last saw.
+ *
+ * Requests are grouped by Node for efficiency.
+ */
+private void validateOffsetsAsync(Map partitionsToValidate) {
+final Map> 
regrouped =
+regroupFetchPositionsByLeader(partitionsToValidate);
+
+regrouped.forEach((node, fetchPositions) -> {
+if (node.isEmpty()) {
+metadata.requestUpdate();
+return;
+}
+
+NodeApiVersions nodeApiVersions = apiVersions.get(node.idString());
+if (nodeApiVersions == null) {
+client.tryConnect(node);
+return;
+}
+
+if (!hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
+log.debug("Skipping validation of fetch offsets for partitions 
{} since the broker does not " +
+  "support the required protocol version 
(introduced in Kafka 2.3)",
+fetchPositions.keySet());
+completeAllValidations(fetchPositions);
+return;
+}
+
+// We need to get the client epoch state before sending out the 
leader epoch request, and use it to
+// decide whether we need to validate offsets.
+if (!metadata.hasReliableLeaderEpochs()) {

Review comment:
   Change one: immediately complete the validation when the leader epoch is 
not reliable.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-2419) Allow certain Sensors to be garbage collected after inactivity

2020-04-21 Thread Chenxin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chenxin updated KAFKA-2419:
---
Attachment: leak2.jpg

> Allow certain Sensors to be garbage collected after inactivity
> --
>
> Key: KAFKA-2419
> URL: https://issues.apache.org/jira/browse/KAFKA-2419
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.0
>Reporter: Aditya Auradkar
>Assignee: Aditya Auradkar
>Priority: Blocker
>  Labels: quotas
> Fix For: 0.9.0.0
>
> Attachments: Leak1, Leak1.jpg, leak2.jpg
>
>
> Currently, metrics cannot be removed once registered. 
> Implement a feature to remove certain sensors after a certain period of 
> inactivity (perhaps configurable).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-2419) Allow certain Sensors to be garbage collected after inactivity

2020-04-21 Thread Chenxin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-2419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089231#comment-17089231
 ] 

Chenxin commented on KAFKA-2419:


Execuse me. Is this bug fix now? I just see a ExpireSensorTask in Metrics, but 
nowhere can use. Then I see beyond 800,000 sensors existed in my application, 
and it is in Old generation ,can't be GC.
I use kafka-clients version 2.1.1 .

!Leak1.jpg|width=217,height=175!

!leak2.jpg|width=782,height=290!

> Allow certain Sensors to be garbage collected after inactivity
> --
>
> Key: KAFKA-2419
> URL: https://issues.apache.org/jira/browse/KAFKA-2419
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.0
>Reporter: Aditya Auradkar
>Assignee: Aditya Auradkar
>Priority: Blocker
>  Labels: quotas
> Fix For: 0.9.0.0
>
> Attachments: Leak1, Leak1.jpg, leak2.jpg
>
>
> Currently, metrics cannot be removed once registered. 
> Implement a feature to remove certain sensors after a certain period of 
> inactivity (perhaps configurable).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-2419) Allow certain Sensors to be garbage collected after inactivity

2020-04-21 Thread Chenxin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chenxin updated KAFKA-2419:
---
Attachment: Leak1.jpg

> Allow certain Sensors to be garbage collected after inactivity
> --
>
> Key: KAFKA-2419
> URL: https://issues.apache.org/jira/browse/KAFKA-2419
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.0
>Reporter: Aditya Auradkar
>Assignee: Aditya Auradkar
>Priority: Blocker
>  Labels: quotas
> Fix For: 0.9.0.0
>
> Attachments: Leak1, Leak1.jpg, leak2.jpg
>
>
> Currently, metrics cannot be removed once registered. 
> Implement a feature to remove certain sensors after a certain period of 
> inactivity (perhaps configurable).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-2419) Allow certain Sensors to be garbage collected after inactivity

2020-04-21 Thread Chenxin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chenxin updated KAFKA-2419:
---
Attachment: Leak1

> Allow certain Sensors to be garbage collected after inactivity
> --
>
> Key: KAFKA-2419
> URL: https://issues.apache.org/jira/browse/KAFKA-2419
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.9.0.0
>Reporter: Aditya Auradkar
>Assignee: Aditya Auradkar
>Priority: Blocker
>  Labels: quotas
> Fix For: 0.9.0.0
>
> Attachments: Leak1
>
>
> Currently, metrics cannot be removed once registered. 
> Implement a feature to remove certain sensors after a certain period of 
> inactivity (perhaps configurable).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] iceChen8123 commented on issue #233: KAFKA-2419; Garbage collect unused sensors

2020-04-21 Thread GitBox


iceChen8123 commented on issue #233:
URL: https://github.com/apache/kafka/pull/233#issuecomment-617512892


   Execuse me. Is this bug fix now? I just see a ExpireSensorTask in Metrics, 
but nowhere can use. Then I see beyond 800,000 sensors existed in my 
application, and it is in Old generation ,can't be GC. 
   I use kafka-clients version 2.1.1 .
   
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8924) Default grace period (-1) of TimeWindows causes suppress to emit events after 24h

2020-04-21 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089214#comment-17089214
 ] 

Matthias J. Sax commented on KAFKA-8924:


{quote}there is no sensible default grace period
{quote}
This is of course subjective, but I would be personally fine with a default of 
zero. Even if not very common, there are use-cases for which data is actually 
ordered (in fact, each time when one uses wall clock time); thus, I don't 
consider it "crazy" to use a default of zero.

Furthermore, for simple demos and when people start to build their first apps, 
it seem desirable to have a few mandatory parameters as possible to make it 
easier to get started?

I see your argument that for a real production use case, one might want to set 
a custom grace-period with high probability and thus, I could be convinced to 
make it mandatory. It's just a slight preference of mine to use a default of 
zero.

What I like most about the proposal is, that we could just fix it in 2.7. We 
should have thought about this earlier and should have fix it in the same 
release `suppress()` was introduces. Well, better a little delayed than even 
more delayed (ie, to 3.0). It would be great to get a KIP for this and get it 
into 2.7, release.

> Default grace period (-1) of TimeWindows causes suppress to emit events after 
> 24h
> -
>
> Key: KAFKA-8924
> URL: https://issues.apache.org/jira/browse/KAFKA-8924
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Michał
>Assignee: Michał
>Priority: Major
>  Labels: needs-kip
>
> h2. Problem 
> The default creation of TimeWindows, like
> {code}
> TimeWindows.of(ofMillis(xxx))
> {code}
> calls an internal constructor
> {code}
> return new TimeWindows(sizeMs, sizeMs, -1, DEFAULT_RETENTION_MS);
> {code}
> And the *-1* parameter is the default grace period which I think is here for 
> backward compatibility
> {code}
> @SuppressWarnings("deprecation") // continuing to support 
> Windows#maintainMs/segmentInterval in fallback mode
> @Override
> public long gracePeriodMs() {
> // NOTE: in the future, when we remove maintainMs,
> // we should default the grace period to 24h to maintain the default 
> behavior,
> // or we can default to (24h - size) if you want to be super accurate.
> return graceMs != -1 ? graceMs : maintainMs() - size();
> }
> {code}
> The problem is that if you use a TimeWindows with gracePeriod of *-1* 
> together with suppress *untilWindowCloses*, it never emits an event.
> You can check the Suppress tests 
> (SuppressScenarioTest.shouldSupportFinalResultsForTimeWindows), where 
> [~vvcephei] was (maybe) aware of that and all the scenarios specify the 
> gracePeriod.
> I will add a test without it on my branch and it will fail.
> The test: 
> https://github.com/atais/kafka/commit/221a04dc40d636ffe93a0ad95dfc6bcad653f4db
>  
> h2. Now what can be done
> One easy fix would be to change the default value to 0, which works fine for 
> me in my project, however, I am not aware of the impact it would have done 
> due to the changes in the *gracePeriodMs* method mentioned before.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9898) Flaky Test StoreQueryIntegrationTest.shouldQueryAllStalePartitionStores

2020-04-21 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-9898:
---
Component/s: unit tests

> Flaky Test StoreQueryIntegrationTest.shouldQueryAllStalePartitionStores
> ---
>
> Key: KAFKA-9898
> URL: https://issues.apache.org/jira/browse/KAFKA-9898
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.6.0
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test, unit-test
>
> h3. Stacktrace
> java.lang.AssertionError: Expected: is  but: was  at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryAllStalePartitionStores(StoreQueryIntegrationTest.java:233)
>  
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/5900/testReport/junit/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/shouldQueryAllStalePartitionStores/]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on issue #8462: KAFKA-9846: Filter active tasks for running state in KafkaStreams#allLocalStorePartitionLags()

2020-04-21 Thread GitBox


guozhangwang commented on issue #8462:
URL: https://github.com/apache/kafka/pull/8462#issuecomment-617488326


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] andrewchoi5 commented on issue #8479: KAFKA-9769: Finish operations for leaderEpoch-updated partitions up to point ZK Exception

2020-04-21 Thread GitBox


andrewchoi5 commented on issue #8479:
URL: https://github.com/apache/kafka/pull/8479#issuecomment-617479063


   Thanks for referring Matthias. Would appreciate your review @hachikuji 
@cmccabe 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-9899) LogCleaner Tries To Clean Single Partition Over 1000x/Minute

2020-04-21 Thread Jeff Nadler (Jira)
Jeff Nadler created KAFKA-9899:
--

 Summary: LogCleaner Tries To Clean Single Partition Over 
1000x/Minute
 Key: KAFKA-9899
 URL: https://issues.apache.org/jira/browse/KAFKA-9899
 Project: Kafka
  Issue Type: Bug
  Components: log cleaner
Affects Versions: 2.4.1
 Environment: ubuntu bionic, openjdk11.0.6, kafka 2.4.1
Reporter: Jeff Nadler
 Attachments: CPU-Usage.png

I had previously believed this to be the same issue as KAFKA-8764, but I took a 
closer look when it persisted after upgrading to 2.4.1 and now believe this is 
a different bug.

For a topic that is a very low traffic, compact topic the log cleaner will 
sometimes - for a period of usually 2 hours or longer - get stuck in a loop 
where it tries to clean the same partition for the same offset range nonstop, 
and the log cleaner thread consumes 100% of a single core during this time.
h4. 1396 attempts in a single minute:

 

{{root@stage-obs-kafka01:/var/log/kafka# cat log-cleaner.log | grep 22:22: | 
grep "offset range" | wc -l}}

{{1396}}

 
h4. All 1396 of these are looking at the same partition and same offset range:

{{[2020-04-21 22:22:59,862] INFO Cleaner 0: Building offset map for log 
elauneind-firebolt-messages-sfo-0 for 0 segments in offset range [22943108, 
22912825). (kafka.log.LogCleaner)}}

 

These attempts are separated by on average only 30ms.   This is a small 3 node 
cluster, note that the CPU graph attached is very clearly bimodal for each 
node:   low when the log cleaner is not "stuck", and much higher when it is.

Eventually the log cleaner appears to find a segment to clean (because enough 
traffic has arrived?) and the loop is broken... for a time.   Note that it 
finds "1 segments" and then finally moves on to check other topic-partitions.

{{...tens of thousands of this first one then}}

{{[2020-04-21 20:06:02,531] INFO Cleaner 0: Building offset map for log 
elauneind-firebolt-messages-sfo-0 for 0 segments in *offset range* [23591841, 
23575583). (kafka.log.LogCleaner)}}{{[2020-04-21 20:06:02,567] INFO Cleaner 0: 
Building offset map for log elauneind-firebolt-messages-sfo-0 for 1 segments in 
*offset range* [23591841, 23621641). (kafka.log.LogCleaner)}}{{[2020-04-21 
20:43:04,309] INFO Cleaner 0: Building offset map for log 
elauneind-firebolt-messages-s2r1-0 for 1 segments in *offset range* [2687968, 
2732498). (kafka.log.LogCleaner)}}

 
h4. The topic gets about 100 messsages/minute, and it's config is:

{{Topic: elauneind-firebolt-messages-sfo PartitionCount: 1 ReplicationFactor: 3 
Configs: 
min.insync.replicas=1,cleanup.policy=compact,delete,segment.bytes=10240,retention.ms=90,message.format.version=2.3-IV1,min.compaction.lag.ms=30,min.cleanable.dirty.ratio=0.2,unclean.leader.election.enable=true,retention.bytes=1073741824}}{{
 Topic: elauneind-firebolt-messages-sfo Partition: 0 Leader: 0 Replicas: 0,2,1 
Isr: 0,1,2}}

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2020-04-21 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089107#comment-17089107
 ] 

Sophie Blee-Goldman commented on KAFKA-7965:


[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/5900/testReport/junit/kafka.api/ConsumerBounceTest/testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup/]

> Flaky Test 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> 
>
> Key: KAFKA-7965
> URL: https://issues.apache.org/jira/browse/KAFKA-7965
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 1.1.1, 2.2.0, 2.3.0
>Reporter: Matthias J. Sax
>Assignee: David Jacot
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/]
> {quote}java.lang.AssertionError: Received 0, expected at least 68 at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.assertTrue(Assert.java:41) at 
> kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) 
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320)
>  at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9898) Flaky Test StoreQueryIntegrationTest.shouldQueryAllStalePartitionStores

2020-04-21 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-9898:
--

 Summary: Flaky Test 
StoreQueryIntegrationTest.shouldQueryAllStalePartitionStores
 Key: KAFKA-9898
 URL: https://issues.apache.org/jira/browse/KAFKA-9898
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.6.0
Reporter: Sophie Blee-Goldman


h3. Stacktrace

java.lang.AssertionError: Expected: is  but: was  at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at 
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryAllStalePartitionStores(StoreQueryIntegrationTest.java:233)

 

[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/5900/testReport/junit/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/shouldQueryAllStalePartitionStores/]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on issue #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-21 Thread GitBox


vvcephei commented on issue #8497:
URL: https://github.com/apache/kafka/pull/8497#issuecomment-617439325


   Unrelated java 11 failures:
   
kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
   
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryAllStalePartitionStores



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9335) java.lang.IllegalArgumentException: Number of partitions must be at least 1.

2020-04-21 Thread Yuexi Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089096#comment-17089096
 ] 

Yuexi Liu commented on KAFKA-9335:
--

[~vveeramani] I test it, the 2.4.1 fixed it

> java.lang.IllegalArgumentException: Number of partitions must be at least 1.
> 
>
> Key: KAFKA-9335
> URL: https://issues.apache.org/jira/browse/KAFKA-9335
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Nitay Kufert
>Assignee: Boyang Chen
>Priority: Blocker
>  Labels: bug
> Fix For: 2.4.1
>
>
> Hey,
> When trying to upgrade our Kafka streams client to 2.4.0 (from 2.3.1) we 
> encountered the following exception: 
> {code:java}
> java.lang.IllegalArgumentException: Number of partitions must be at least 1.
> {code}
> It's important to notice that the exact same code works just fine at 2.3.1.
>  
> I have created a "toy" example which reproduces this exception:
> [https://gist.github.com/nitayk/50da33b7bcce19ad0a7f8244d309cb8f]
> and I would love to get some insight regarding why its happening / ways to 
> get around it
>  
> Thanks



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8924) Default grace period (-1) of TimeWindows causes suppress to emit events after 24h

2020-04-21 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089090#comment-17089090
 ] 

John Roesler commented on KAFKA-8924:
-

Thanks for the comment, [~cadonna] ,

I agree that it's not as elegant-looking this way, but the point is that if we 
make it two separate methods, then we _have to_ select a default. I'm leaning 
toward the viewpoint that there is no sensible default grace period, therefore 
it's got to be a required parameter, which means that it must be present in the 
"root" factory method arguments.

I can see now why other stream processing systems attempt to adaptively "learn" 
the grace period by observing the input stream's lateness, but I'm still on the 
fence about whether that's really a good "default" either.

> Default grace period (-1) of TimeWindows causes suppress to emit events after 
> 24h
> -
>
> Key: KAFKA-8924
> URL: https://issues.apache.org/jira/browse/KAFKA-8924
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Michał
>Assignee: Michał
>Priority: Major
>  Labels: needs-kip
>
> h2. Problem 
> The default creation of TimeWindows, like
> {code}
> TimeWindows.of(ofMillis(xxx))
> {code}
> calls an internal constructor
> {code}
> return new TimeWindows(sizeMs, sizeMs, -1, DEFAULT_RETENTION_MS);
> {code}
> And the *-1* parameter is the default grace period which I think is here for 
> backward compatibility
> {code}
> @SuppressWarnings("deprecation") // continuing to support 
> Windows#maintainMs/segmentInterval in fallback mode
> @Override
> public long gracePeriodMs() {
> // NOTE: in the future, when we remove maintainMs,
> // we should default the grace period to 24h to maintain the default 
> behavior,
> // or we can default to (24h - size) if you want to be super accurate.
> return graceMs != -1 ? graceMs : maintainMs() - size();
> }
> {code}
> The problem is that if you use a TimeWindows with gracePeriod of *-1* 
> together with suppress *untilWindowCloses*, it never emits an event.
> You can check the Suppress tests 
> (SuppressScenarioTest.shouldSupportFinalResultsForTimeWindows), where 
> [~vvcephei] was (maybe) aware of that and all the scenarios specify the 
> gracePeriod.
> I will add a test without it on my branch and it will fail.
> The test: 
> https://github.com/atais/kafka/commit/221a04dc40d636ffe93a0ad95dfc6bcad653f4db
>  
> h2. Now what can be done
> One easy fix would be to change the default value to 0, which works fine for 
> me in my project, however, I am not aware of the impact it would have done 
> due to the changes in the *gracePeriodMs* method mentioned before.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on issue #8254: KIP-557: Add Emit On Change Support

2020-04-21 Thread GitBox


vvcephei commented on issue #8254:
URL: https://github.com/apache/kafka/pull/8254#issuecomment-617423885


   I hope I didn't step on your toes, @ConcurrencyPractitioner , but I just 
wanted to make sure that you're unblocked to finish up this PR. Figuring out 
what's exactly wrong with those tests and whether it's ok can be a bit subtle.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on issue #8254: KIP-557: Add Emit On Change Support

2020-04-21 Thread GitBox


vvcephei commented on issue #8254:
URL: https://github.com/apache/kafka/pull/8254#issuecomment-617423139


   Ok, last one. The TransformValuesTest is just another case where the test 
input data is now considered idempotent, which is fine:
   
   ```diff
   --- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
   +++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
   @@ -398,8 +398,8 @@ public class KTableTransformValuesTest {
driver.createInputTopic(INPUT_TOPIC, new 
StringSerializer(), new StringSerializer());

inputTopic.pipeInput("A", "ignored", 5L);
   -inputTopic.pipeInput("A", "ignored", 15L);
   -inputTopic.pipeInput("A", "ignored", 10L);
   +inputTopic.pipeInput("A", "ignored1", 15L);
   +inputTopic.pipeInput("A", "ignored2", 10L);

assertThat(output(), hasItems(new KeyValueTimestamp<>("A", "1", 5),
new KeyValueTimestamp<>("A", "0", 15),
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on issue #8254: KIP-557: Add Emit On Change Support

2020-04-21 Thread GitBox


vvcephei commented on issue #8254:
URL: https://github.com/apache/kafka/pull/8254#issuecomment-617418517


   The source topic restart integration test was actually just failing because 
the tests were polluting each others' topics. This is one way to fix it:
   
   ```diff
   diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
   index 3ec239fab9..b42a5852a5 100644
   --- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
   +++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
   @@ -100,6 +100,7 @@ public class KTableSourceTopicRestartIntegrationTest {
@After
public void after() throws Exception {
IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
   +CLUSTER.deleteAllTopicsAndWait(60_000L);
}

@Test
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9897) Flaky Test StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores

2020-04-21 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9897?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089052#comment-17089052
 ] 

Matthias J. Sax commented on KAFKA-9897:


[~NaviBrar] – would you be interested to look into this one? You should have 
some context about the test. Would be nice if we could find and fix the root 
cause of the flakiness.

> Flaky Test StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores
> -
>
> Key: KAFKA-9897
> URL: https://issues.apache.org/jira/browse/KAFKA-9897
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.6.0
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/22/testReport/junit/org.apache.kafka.streams.integration/StoreQueryIntegrationTest/shouldQuerySpecificActivePartitionStores/]
> {quote}org.apache.kafka.streams.errors.InvalidStateStoreException: Cannot get 
> state store source-table because the stream thread is PARTITIONS_ASSIGNED, 
> not RUNNING at 
> org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider.stores(StreamThreadStateStoreProvider.java:85)
>  at 
> org.apache.kafka.streams.state.internals.QueryableStoreProvider.getStore(QueryableStoreProvider.java:61)
>  at org.apache.kafka.streams.KafkaStreams.store(KafkaStreams.java:1183) at 
> org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQuerySpecificActivePartitionStores(StoreQueryIntegrationTest.java:178){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on issue #8254: KIP-557: Add Emit On Change Support

2020-04-21 Thread GitBox


vvcephei commented on issue #8254:
URL: https://github.com/apache/kafka/pull/8254#issuecomment-617392150


   I also took a look at the foreign-key join test, which is actually telling 
us something awesome: your feature allows us to drop _unnecessary_ tombstones 
that we'd otherwise send under some conditions.
   
   Anyway, it's complicated, so here's a fix for the test:
   ```diff
   --- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
   +++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
   @@ -48,6 +48,7 @@ import java.util.Properties;
import java.util.function.Function;

import static java.util.Collections.emptyMap;
   +import static java.util.Collections.singletonMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
   @@ -371,12 +372,16 @@ public class KTableKTableForeignKeyJoinIntegrationTest 
{

// Deleting a non-joining record produces an unnecessary 
tombstone for inner joins, because
// it's not possible to know whether a result was previously 
emitted.
   +// HOWEVER, when the final join result is materialized (either 
explicitly or
   +// implicitly by a subsequent join), we _can_ detect that the 
tombstone is unnecessary and drop it.
// For the left join, the tombstone is necessary.
left.pipeInput("lhs1", (String) null);
{
assertThat(
outputTopic.readKeyValuesToMap(),
   -is(mkMap(mkEntry("lhs1", null)))
   +is(leftJoin || !(materialized || rejoin)
   +   ? mkMap(mkEntry("lhs1", null))
   +   : emptyMap())
);
if (materialized) {
assertThat(
   @@ -452,12 +457,15 @@ public class KTableKTableForeignKeyJoinIntegrationTest 
{
}
// "moving" our subscription to another non-existent FK results 
in an unnecessary tombstone for inner join,
// since it impossible to know whether the prior FK existed or 
not (and thus whether any results have
   -// previously been emitted)
   +// previously been emitted). HOWEVER, when the final join 
result is materialized (either explicitly or
   +// implicitly by a subsequent join), we _can_ detect that the 
tombstone is unnecessary and drop it.
// The left join emits a _necessary_ update (since the lhs 
record has actually changed)
left.pipeInput("lhs1", "lhsValue1|rhs2");
assertThat(
outputTopic.readKeyValuesToMap(),
   -is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs2,null)" 
: null)))
   +is(leftJoin
   +   ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs2,null)"))
   +   : (materialized || rejoin) ? emptyMap() : 
singletonMap("lhs1", null))
);
if (materialized) {
assertThat(
   @@ -469,7 +477,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
left.pipeInput("lhs1", "lhsValue1|rhs3");
assertThat(
outputTopic.readKeyValuesToMap(),
   -is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs3,null)" 
: null)))
   +is(leftJoin
   +   ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs3,null)"))
   +   : (materialized || rejoin) ? emptyMap() : 
singletonMap("lhs1", null))
);
if (materialized) {
assertThat(
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9882) Add Block getAssignments()

2020-04-21 Thread Jesse Anderson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089016#comment-17089016
 ] 

Jesse Anderson commented on KAFKA-9882:
---

Seeking each time was just an example. Adding a config wouldn't handle all 
cases. For example, it would handle seeking to a specific offset based on which 
partition it was assigned.

> Add Block getAssignments()
> --
>
> Key: KAFKA-9882
> URL: https://issues.apache.org/jira/browse/KAFKA-9882
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Jesse Anderson
>Priority: Critical
>
> In 2.0, the KafkaConsumer poll(long) was deprecated and replaced with a 
> poll(Duration). The poll(Duration) does not block for consumer assignments.
> Now, there isn't a blocking method that can get consumer assignments.
> A new KafkaConsumer method needs to be added that blocks while getting 
> consumer assignments.
> The current workaround is to poll for a short amount of time in a while loop 
> and check the size of assignment(). This isn't a great method of verifying 
> the consumer assignment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on issue #8520: Add explicit grace period to tumbling window example

2020-04-21 Thread GitBox


vvcephei commented on issue #8520:
URL: https://github.com/apache/kafka/pull/8520#issuecomment-617376010


   Gah, I forgot to add the reviewers to the merge commit.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] surabhidixit opened a new pull request #8526: KAFKA-6867: corrected the typos in upgrade.html

2020-04-21 Thread GitBox


surabhidixit opened a new pull request #8526:
URL: https://github.com/apache/kafka/pull/8526


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on issue #8254: KIP-557: Add Emit On Change Support

2020-04-21 Thread GitBox


vvcephei commented on issue #8254:
URL: https://github.com/apache/kafka/pull/8254#issuecomment-617372772


   Hey @ConcurrencyPractitioner , sorry it took so long. It's just again 
because the test happened to expect idempotent updates to flow through 
regularly, but not for anything important. Just changing the value of the 
"tick" record the second time fixes it without breaking anything about the test.
   
   Here's my diff:
   ```diff
   --- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
   +++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
   @@ -615,11 +615,11 @@ public class SuppressScenarioTest {
);


   -inputTopicRight.pipeInput("tick", "tick", 21L);
   +inputTopicRight.pipeInput("tick", "tick1", 21L);
verify(
drainProducerRecords(driver, "output", STRING_DESERIALIZER, 
STRING_DESERIALIZER),
asList(
   -new KeyValueTimestamp<>("tick", "(null,tick)", 21), // 
just a testing artifact
   +new KeyValueTimestamp<>("tick", "(null,tick1)", 21), // 
just a testing artifact
new KeyValueTimestamp<>("A", "(b,2)", 13L)
)
);
   @@ -703,11 +703,11 @@ public class SuppressScenarioTest {
);


   -inputTopicLeft.pipeInput("tick", "tick", 21L);
   +inputTopicLeft.pipeInput("tick", "tick1", 21L);
verify(
drainProducerRecords(driver, "output", STRING_DESERIALIZER, 
STRING_DESERIALIZER),
asList(
   -new KeyValueTimestamp<>("tick", "(tick,null)", 21), // 
just a testing artifact
   +new KeyValueTimestamp<>("tick", "(tick1,null)", 21), // 
just a testing artifact
new KeyValueTimestamp<>("A", "(2,b)", 13L)
)
);
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9882) Add Block getAssignments()

2020-04-21 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17089006#comment-17089006
 ] 

Boyang Chen commented on KAFKA-9882:


So you are trying to reset offset every time when you restart the consumer? If 
that's the case, we could get a config like `init.offset.reset` which enforces 
the reset every time we restart the consumer.

> Add Block getAssignments()
> --
>
> Key: KAFKA-9882
> URL: https://issues.apache.org/jira/browse/KAFKA-9882
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Jesse Anderson
>Priority: Critical
>
> In 2.0, the KafkaConsumer poll(long) was deprecated and replaced with a 
> poll(Duration). The poll(Duration) does not block for consumer assignments.
> Now, there isn't a blocking method that can get consumer assignments.
> A new KafkaConsumer method needs to be added that blocks while getting 
> consumer assignments.
> The current workaround is to poll for a short amount of time in a while loop 
> and check the size of assignment(). This isn't a great method of verifying 
> the consumer assignment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on issue #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-21 Thread GitBox


vvcephei commented on issue #8497:
URL: https://github.com/apache/kafka/pull/8497#issuecomment-617364803


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on issue #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-21 Thread GitBox


vvcephei commented on issue #8497:
URL: https://github.com/apache/kafka/pull/8497#issuecomment-617364180


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-21 Thread GitBox


ableegoldman commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r412424861



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ValidClientsByTaskLoadQueue.java
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import org.apache.kafka.streams.processor.TaskId;
+
+/**
+ * Wraps a priority queue of clients and returns the next valid candidate(s) 
based on the current task assignment
+ */
+class ValidClientsByTaskLoadQueue {
+private final PriorityQueue clientsByTaskLoad;
+private final BiFunction validClientCriteria;
+private final Set uniqueClients = new HashSet<>();
+
+ValidClientsByTaskLoadQueue(final Map clientStates,
+final BiFunction 
validClientCriteria) {
+clientsByTaskLoad = getClientPriorityQueueByTaskLoad(clientStates);
+this.validClientCriteria = validClientCriteria;
+}
+
+/**
+= * @return the next least loaded client that satisfies the given 
criteria, or null if none do
+ */
+UUID poll(final TaskId task) {
+final List validClient = poll(task, 1);
+return validClient.isEmpty() ? null : validClient.get(0);
+}
+
+/**
+ * @return the next N <= {@code numClientsPerTask} clients in the 
underlying priority queue that are valid
+ * candidates for the given task
+ */
+List poll(final TaskId task, final int numClients) {
+final List nextLeastLoadedValidClients = new LinkedList<>();
+final Set invalidPolledClients = new HashSet<>();
+while (nextLeastLoadedValidClients.size() < numClients) {
+UUID candidateClient;
+while (true) {
+candidateClient = clientsByTaskLoad.poll();
+if (candidateClient == null) {
+offerAll(invalidPolledClients);
+return nextLeastLoadedValidClients;
+}
+
+if (validClientCriteria.apply(candidateClient, task)) {
+nextLeastLoadedValidClients.add(candidateClient);
+break;
+} else {
+invalidPolledClients.add(candidateClient);
+}
+}
+}
+offerAll(invalidPolledClients);
+return nextLeastLoadedValidClients;
+}
+
+void offerAll(final Collection clients) {
+for (final UUID client : clients) {
+offer(client);
+}
+}
+
+void offer(final UUID client) {
+if (uniqueClients.contains(client)) {

Review comment:
   @cadonna you're right, I forgot to remove from `uniqueClients` in poll. 
Good catch





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-21 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r412424445



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ValidClientsByTaskLoadQueue.java
##
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import org.apache.kafka.streams.processor.TaskId;
+
+/**
+ * Wraps a priority queue of clients and returns the next valid candidate(s) 
based on the current task assignment
+ */
+class ValidClientsByTaskLoadQueue {
+private final PriorityQueue clientsByTaskLoad;
+private final BiFunction validClientCriteria;
+private final Set uniqueClients = new HashSet<>();
+
+ValidClientsByTaskLoadQueue(final Map clientStates,
+final BiFunction 
validClientCriteria) {
+clientsByTaskLoad = getClientPriorityQueueByTaskLoad(clientStates);
+this.validClientCriteria = validClientCriteria;
+}
+
+/**
+= * @return the next least loaded client that satisfies the given 
criteria, or null if none do
+ */
+UUID poll(final TaskId task) {
+final List validClient = poll(task, 1);
+return validClient.isEmpty() ? null : validClient.get(0);
+}
+
+/**
+ * @return the next N <= {@code numClientsPerTask} clients in the 
underlying priority queue that are valid
+ * candidates for the given task
+ */
+List poll(final TaskId task, final int numClients) {
+final List nextLeastLoadedValidClients = new LinkedList<>();
+final Set invalidPolledClients = new HashSet<>();
+while (nextLeastLoadedValidClients.size() < numClients) {
+UUID candidateClient;
+while (true) {
+candidateClient = clientsByTaskLoad.poll();
+if (candidateClient == null) {
+offerAll(invalidPolledClients);
+return nextLeastLoadedValidClients;
+}
+
+if (validClientCriteria.apply(candidateClient, task)) {
+nextLeastLoadedValidClients.add(candidateClient);
+break;
+} else {
+invalidPolledClients.add(candidateClient);
+}
+}
+}
+offerAll(invalidPolledClients);
+return nextLeastLoadedValidClients;
+}
+
+void offerAll(final Collection clients) {
+for (final UUID client : clients) {
+offer(client);
+}
+}
+
+void offer(final UUID client) {
+if (uniqueClients.contains(client)) {

Review comment:
   Gah! You're right. We should also _remove_ the client from 
`uniqueClients` when we `poll`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-21 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r412423011



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ValidClientsByTaskLoadQueue.java
##
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import org.apache.kafka.streams.processor.TaskId;
+
+/**
+ * Wraps a priority queue of clients and returns the next valid candidate(s) 
based on the current task assignment
+ */
+class ValidClientsByTaskLoadQueue {
+private final PriorityQueue clientsByTaskLoad;
+private final BiFunction validClientCriteria;
+
+ValidClientsByTaskLoadQueue(final Map clientStates,
+final BiFunction 
validClientCriteria) {

Review comment:
   Ah, sorry about that @ableegoldman ; I wasn't able (or was too lazy) to 
follow the `git praise` trail through the class movement. Well, kudos to you, 
then. :) 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-21 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r412422082



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,103 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClientOrNoCaughtUpClientsExist;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
 
-final TaskId task;
-final UUID source;
-final UUID destination;
+class TaskMovement {
+private final TaskId task;
+private final UUID destination;
+private final SortedSet caughtUpClients;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+private TaskMovement(final TaskId task, final UUID destination, final 
SortedSet caughtUpClients) {
 this.task = task;
-this.source = source;
 this.destination = destination;
-}
+this.caughtUpClients = caughtUpClients;
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
+if (caughtUpClients == null || caughtUpClients.isEmpty()) {
+throw new IllegalStateException("Should not attempt to move a task 
if no caught up clients exist");
 }
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
 }
 
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad = new 
ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> 
taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 

[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-21 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r412419973



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[jira] [Commented] (KAFKA-9882) Add Block getAssignments()

2020-04-21 Thread Jesse Anderson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088922#comment-17088922
 ] 

Jesse Anderson commented on KAFKA-9882:
---

{code:java}
// Create KafkaConsumer and subscribe

// Call blocking getter for partition assignment
java.util.Set assignment = consumer.getAssignment();

// Seek to end of topic
consumer.seekToEnd​(assignment);

// Seek to beginning of topic
consumer.seekToBeginning​(assignment);

// Start polling{code}

> Add Block getAssignments()
> --
>
> Key: KAFKA-9882
> URL: https://issues.apache.org/jira/browse/KAFKA-9882
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Jesse Anderson
>Priority: Critical
>
> In 2.0, the KafkaConsumer poll(long) was deprecated and replaced with a 
> poll(Duration). The poll(Duration) does not block for consumer assignments.
> Now, there isn't a blocking method that can get consumer assignments.
> A new KafkaConsumer method needs to be added that blocks while getting 
> consumer assignments.
> The current workaround is to poll for a short amount of time in a while loop 
> and check the size of assignment(). This isn't a great method of verifying 
> the consumer assignment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9882) Add Block getAssignments()

2020-04-21 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088917#comment-17088917
 ] 

Boyang Chen commented on KAFKA-9882:


I see, to make the discussion more effective, could you draft a sample code 
skeleton assuming we have introduced a new blocking API for the above use case?

> Add Block getAssignments()
> --
>
> Key: KAFKA-9882
> URL: https://issues.apache.org/jira/browse/KAFKA-9882
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Jesse Anderson
>Priority: Critical
>
> In 2.0, the KafkaConsumer poll(long) was deprecated and replaced with a 
> poll(Duration). The poll(Duration) does not block for consumer assignments.
> Now, there isn't a blocking method that can get consumer assignments.
> A new KafkaConsumer method needs to be added that blocks while getting 
> consumer assignments.
> The current workaround is to poll for a short amount of time in a while loop 
> and check the size of assignment(). This isn't a great method of verifying 
> the consumer assignment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9882) Add Block getAssignments()

2020-04-21 Thread Jesse Anderson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088908#comment-17088908
 ] 

Jesse Anderson commented on KAFKA-9882:
---

subscribe() doesn't get a partition assignment until poll() is called. I 
double-checked this just now.

Using this method would lead to quite a bit of complicated code. The listener 
code would have to maintain a boolean to check if this is the first time the 
onPartitionsAssigned method was called. It would have to flip the first time it 
was called.

I think a more straightforward approach to getting partition assignments would 
result in more readable and less buggy code.

> Add Block getAssignments()
> --
>
> Key: KAFKA-9882
> URL: https://issues.apache.org/jira/browse/KAFKA-9882
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Jesse Anderson
>Priority: Critical
>
> In 2.0, the KafkaConsumer poll(long) was deprecated and replaced with a 
> poll(Duration). The poll(Duration) does not block for consumer assignments.
> Now, there isn't a blocking method that can get consumer assignments.
> A new KafkaConsumer method needs to be added that blocks while getting 
> consumer assignments.
> The current workaround is to poll for a short amount of time in a while loop 
> and check the size of assignment(). This isn't a great method of verifying 
> the consumer assignment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9882) Add Block getAssignments()

2020-04-21 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9882?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1708#comment-1708
 ] 

Boyang Chen commented on KAFKA-9882:


I checked back on the original KIP 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-266%3A+Fix+consumer+indefinite+blocking+behavior]
So it seems the original intention was to resolve the indefinite blocking. For 
your case, have you considered using
`public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)` 
which can plug in a user-callback to wait for assignment result?

> Add Block getAssignments()
> --
>
> Key: KAFKA-9882
> URL: https://issues.apache.org/jira/browse/KAFKA-9882
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Affects Versions: 2.5.0
>Reporter: Jesse Anderson
>Priority: Critical
>
> In 2.0, the KafkaConsumer poll(long) was deprecated and replaced with a 
> poll(Duration). The poll(Duration) does not block for consumer assignments.
> Now, there isn't a blocking method that can get consumer assignments.
> A new KafkaConsumer method needs to be added that blocks while getting 
> consumer assignments.
> The current workaround is to poll for a short amount of time in a while loop 
> and check the size of assignment(). This isn't a great method of verifying 
> the consumer assignment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-21 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r412313524



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,23 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
 
   /**
-   * The remaining delay time
+   * true if the item is still delayed
*/
-  def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  def isDelayed: Boolean = {
+time.nanoseconds < dueNs
   }
 
-  def compareTo(d: Delayed): Int = {
-val other = d.asInstanceOf[DelayedItem]
-java.lang.Long.compare(dueMs, other.dueMs)
+  def compareTo(d: DelayedItem): Int = {

Review comment:
   Done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8417: KAFKA-8955: Add an AbstractResponse#errorCounts(Stream) and tidy

2020-04-21 Thread GitBox


ijuma commented on a change in pull request #8417:
URL: https://github.com/apache/kafka/pull/8417#discussion_r412309101



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
##
@@ -57,6 +59,10 @@ public ByteBuffer serialize(ApiKeys apiKey, short version, 
int correlationId) {
 return Collections.singletonMap(error, 1);
 }
 
+protected Map errorCounts(Stream errors) {
+return errors.collect(Collectors.groupingBy(e -> e, 
Collectors.summingInt(e -> 1)));

Review comment:
   Thanks! I think the way you have it now is both fast and concise, not 
worth changing it to use `forEach` given how this is normally used.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-21 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r412305902



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,23 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
 
   /**
-   * The remaining delay time
+   * true if the item is still delayed
*/
-  def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  def isDelayed: Boolean = {
+time.nanoseconds < dueNs
   }
 
-  def compareTo(d: Delayed): Int = {
-val other = d.asInstanceOf[DelayedItem]
-java.lang.Long.compare(dueMs, other.dueMs)
+  def compareTo(d: DelayedItem): Int = {

Review comment:
   Makes sense. I'll remove this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on issue #8508: MINOR: Improve usage of LogCaptureAppender

2020-04-21 Thread GitBox


mjsax commented on issue #8508:
URL: https://github.com/apache/kafka/pull/8508#issuecomment-617266065


   Java 8 and Java 11 passed.
   Java 14: failed with unknown error...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on issue #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-21 Thread GitBox


vvcephei commented on issue #8497:
URL: https://github.com/apache/kafka/pull/8497#issuecomment-617266153


   Test this, please. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on issue #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-21 Thread GitBox


vvcephei commented on issue #8497:
URL: https://github.com/apache/kafka/pull/8497#issuecomment-617265836


   Test this, please. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on issue #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-21 Thread GitBox


vvcephei commented on issue #8497:
URL: https://github.com/apache/kafka/pull/8497#issuecomment-617264248


   Test this, please. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-9850) Move KStream#repartition operator validation during Topology build process

2020-04-21 Thread HaiyuanZhao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

HaiyuanZhao reassigned KAFKA-9850:
--

Assignee: HaiyuanZhao

> Move KStream#repartition operator validation during Topology build process 
> ---
>
> Key: KAFKA-9850
> URL: https://issues.apache.org/jira/browse/KAFKA-9850
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Levani Kokhreidze
>Assignee: HaiyuanZhao
>Priority: Major
>  Labels: help-wanted, newbie, newbie++
>
> `KStream#repartition` operation performs most of its validation regarding 
> joining, co-partitioning, etc after starting Kafka Streams instance. Some 
> parts of this validation can be detected much earlier, specifically during 
> topology `build()`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] tombentley commented on a change in pull request #8417: KAFKA-8955: Add an AbstractResponse#errorCounts(Stream) and tidy

2020-04-21 Thread GitBox


tombentley commented on a change in pull request #8417:
URL: https://github.com/apache/kafka/pull/8417#discussion_r412287986



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
##
@@ -57,6 +59,10 @@ public ByteBuffer serialize(ApiKeys apiKey, short version, 
int correlationId) {
 return Collections.singletonMap(error, 1);
 }
 
+protected Map errorCounts(Stream errors) {
+return errors.collect(Collectors.groupingBy(e -> e, 
Collectors.summingInt(e -> 1)));

Review comment:
   @ijuma I wrote a small microbenchmark (committed and reverted if you 
want to take a look) to compare performance. I picked `TxnOffsetCommitResponse` 
(more or less at random, but since it has two levels of nesting for topics and 
partitions it has a double loop) with an unrepresentitively large number of 
topics and partitions. 
   
   Using the old code (`errorCounts(errors())`), I got this test run:
   
   ```
   {NONE=2}
   run 0, times=2 took 50172790515ns, 398.62243647820753ops/s
   {NONE=2}
   run 1, times=2 took 49210843555ns, 406.4144923191004ops/s
   {NONE=2}
   run 2, times=2 took 49366208092ns, 405.1354311582437ops/s
   {NONE=2}
   run 3, times=2 took 48628565963ns, 411.2808922890589ops/s
   {NONE=2}
   run 4, times=2 took 48727847017ns, 410.4429237971969ops/s
   ```
   
   I aborted early because it was pretty slow. You can see the JIT is improving 
the performance a little over time.
   
   Using the streaming approach I got this test run:
   
   ```
   {NONE=2}
   run 0, times=2 took 6984797524ns, 2863.36145482805ops/s
   {NONE=2}
   run 1, times=2 took 6566254988ns, 3045.8762318171493ops/s
   {NONE=2}
   run 2, times=2 took 6553362923ns, 3051.868214074797ops/s
   {NONE=2}
   run 3, times=2 took 6259904961ns, 3194.936684279159ops/s
   {NONE=2}
   run 4, times=2 took 6675450385ns, 2996.052527772626ops/s
   {NONE=2}
   run 5, times=2 took 6949088789ns, 2878.075184714696ops/s
   {NONE=2}
   run 6, times=2 took 6045899635ns, 3308.02712704972ops/s
   {NONE=2}
   run 7, times=2 took 5845348664ns, 3421.5238730197325ops/s
   {NONE=2}
   run 8, times=2 took 6370088159ns, 3139.6739732311135ops/s
   {NONE=2}
   run 9, times=2 took 6799792822ns, 2941.2660831800854ops/s
   {NONE=2}
   run 10, times=2 took 6641092713ns, 3011.5525959831602ops/s
   {NONE=2}
   run 11, times=2 took 6621610314ns, 3020.4133211696576ops/s
   {NONE=2}
   run 12, times=2 took 6339235087ns, 3154.9547738045576ops/s
   {NONE=2}
   run 13, times=2 took 6461046814ns, 3095.473624593366ops/s
   {NONE=2}
   run 14, times=2 took 6585386195ns, 3037.027656052296ops/s
   {NONE=2}
   run 15, times=2 took 6565973868ns, 3046.0066484ops/s
   {NONE=2}
   run 16, times=2 took 6585253169ns, 3037.0890058031114ops/s
   {NONE=2}
   run 17, times=2 took 6618664562ns, 3021.7576087518905ops/s
   {NONE=2}
   run 18, times=2 took 6592603829ns, 3033.7026945290754ops/s
   {NONE=2}
   run 19, times=2 took 6567525693ns, 3045.2869063484604ops/s
   ```
   
   This is about 7½ times faster.
   
   Out of interest I also rewote the `TxnOffsetCommitResponse.errorCounts()` to 
use a `forEach()`:
   
   ```
   {NONE=2}
   run 0, times=2 took 6038137472ns, 3312.279671131012ops/s
   {NONE=2}
   run 1, times=2 took 5642135982ns, 3544.7568197231726ops/s
   {NONE=2}
   run 2, times=2 took 5551109425ns, 3602.883400195268ops/s
   {NONE=2}
   run 3, times=2 took 5511950192ns, 3628.4798126492215ops/s
   {NONE=2}
   run 4, times=2 took 5180664883ns, 3860.5083423999577ops/s
   {NONE=2}
   run 5, times=2 took 4571569172ns, 4374.865444997799ops/s
   {NONE=2}
   run 6, times=2 took 5472660241ns, 3654.529811692726ops/s
   {NONE=2}
   run 7, times=2 took 5499370051ns, 3636.780179279483ops/s
   {NONE=2}
   run 8, times=2 took 5523721146ns, 3620.7475850736946ops/s
   {NONE=2}
   run 9, times=2 took 4691001711ns, 4263.481710761627ops/s
   {NONE=2}
   run 10, times=2 took 5495174831ns, 3639.5566319698773ops/s
   {NONE=2}
   run 11, times=2 took 5676661773ns, 3523.1974001210237ops/s
   {NONE=2}
   run 12, times=2 took 5605106974ns, 3568.174540249194ops/s
   {NONE=2}
   run 13, times=2 took 5577604479ns, 3585.768778568137ops/s
   {NONE=2}
   run 14, times=2 took 5544332242ns, 3607.287429222572ops/s
   {NONE=2}
   run 15, times=2 took 5502312660ns, 3634.835247621134ops/s
   {NONE=2}
   run 16, times=2 took 5528323376ns, 3617.7333776865516ops/s
   {NONE=2}
   run 17, times=2 took 5528944581ns, 3617.3269069704934ops/s
   

[GitHub] [kafka] chia7712 commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-21 Thread GitBox


chia7712 commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r412284404



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,23 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)
 
   /**
-   * The remaining delay time
+   * true if the item is still delayed
*/
-  def getDelay(unit: TimeUnit): Long = {
-unit.convert(max(dueMs - Time.SYSTEM.milliseconds, 0), 
TimeUnit.MILLISECONDS)
+  def isDelayed: Boolean = {
+time.nanoseconds < dueNs
   }
 
-  def compareTo(d: Delayed): Int = {
-val other = d.asInstanceOf[DelayedItem]
-java.lang.Long.compare(dueMs, other.dueMs)
+  def compareTo(d: DelayedItem): Int = {

Review comment:
   This method was from ```Delayed``` so it seems to me it is ok to remove 
this method if this class does not extend ```Delayed``` anymore





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-21 Thread GitBox


chia7712 commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r412279368



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,24 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)
 
-class DelayedItem(val delayMs: Long) extends Delayed with Logging {
-
-  private val dueMs = Time.SYSTEM.milliseconds + delayMs
-
-  def this(delay: Long, unit: TimeUnit) = this(unit.toMillis(delay))
+  private val dueNs = time.nanoseconds + TimeUnit.MILLISECONDS.toNanos(delayMs)

Review comment:
   > I think it's very unlikely we'll ever hit it given our normal delays.
   
   you are right :) 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] avalsa commented on issue #8221: KAFKA-9561: update task input partitions after rebalance

2020-04-21 Thread GitBox


avalsa commented on issue #8221:
URL: https://github.com/apache/kafka/pull/8221#issuecomment-61720


   > I saw that the stream time update codepath is missing, could we add it 
back? I was thinking about a simpler approach here: since we could not 
decrement the stream-time, no matter we are adding or removing partitions, we 
shall just iterate over all the result partitions and find the min partition 
time. If that min partition time is larger than current stream time, we advance 
it; otherwise we do nothing, WDYT? @guozhangwang @avalsa
   
   I think like it's in code: if we can skip update streamTime and it will not 
violate any requirements it's easiest what we can do and seems it will not 
impact anybody too much as it will be updated at first get next record request.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #8238: KAFKA-9130: KIP-518 Allow listing consumer groups per state

2020-04-21 Thread GitBox


dajac commented on a change in pull request #8238:
URL: https://github.com/apache/kafka/pull/8238#discussion_r412182105



##
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##
@@ -1553,6 +1554,50 @@ class KafkaApisTest {
 assertEquals(Errors.INVALID_REQUEST, response.error)
   }
 
+  @Test
+  def testListGroupsRequest(): Unit = {
+val overviews = List(
+  GroupOverview("group1", "protocol1", "Stable"),
+  GroupOverview("goupp2", "qwerty", "Empty")
+)
+val response = listGroupRequest(Option.empty, overviews)
+assertEquals(2, response.data.groups.size)
+assertEquals("", response.data.groups.get(0).groupState)
+assertEquals("", response.data.groups.get(1).groupState)
+  }
+
+  @Test
+  def testListGroupsRequestWithState(): Unit = {
+val overviews = List(
+  GroupOverview("group1", "protocol1", "Stable")
+)
+val response = listGroupRequest(Option.apply("Stable"), overviews)
+assertEquals(1, response.data.groups.size)
+assertEquals("Stable", response.data.groups.get(0).groupState)
+  }
+
+  private def listGroupRequest(state: Option[String], overviews: 
List[GroupOverview]): ListGroupsResponse = {
+EasyMock.reset(groupCoordinator, clientRequestQuotaManager, requestChannel)
+
+val data = new ListGroupsRequestData()
+if (state.isDefined)
+  data.setStates(Collections.singletonList(state.get))
+val listGroupsRequest = new ListGroupsRequest.Builder(data).build()
+val requestChannelRequest = buildRequest(listGroupsRequest)
+
+val capturedResponse = expectNoThrottling()
+val expectedStates = if (state.isDefined) List(state.get) else List()
+EasyMock.expect(groupCoordinator.handleListGroups(expectedStates))
+  .andReturn((Errors.NONE, overviews))
+EasyMock.replay(groupCoordinator, clientRequestQuotaManager, 
requestChannel)
+
+createKafkaApis().handleListGroupsRequest(requestChannelRequest)
+
+val response = readResponse(ApiKeys.LIST_GROUPS, listGroupsRequest, 
capturedResponse).asInstanceOf[ListGroupsResponse]
+assertEquals(Errors.NONE.code, response.data.errorCode)
+return response

Review comment:
   nit: `return` can be omitted here.

##
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##
@@ -1553,6 +1554,50 @@ class KafkaApisTest {
 assertEquals(Errors.INVALID_REQUEST, response.error)
   }
 
+  @Test
+  def testListGroupsRequest(): Unit = {
+val overviews = List(
+  GroupOverview("group1", "protocol1", "Stable"),
+  GroupOverview("goupp2", "qwerty", "Empty")
+)
+val response = listGroupRequest(Option.empty, overviews)

Review comment:
   nit: We tend to use `None` instead of `Option.empty`.

##
File path: core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
##
@@ -1024,6 +1071,11 @@ object ConsumerGroupCommand extends Logging {
 if (!options.has(groupOpt) && !options.has(allGroupsOpt))
   CommandLineUtils.printUsageAndDie(parser,
 s"Option $describeOpt takes one of these options: 
${allGroupSelectionScopeOpts.mkString(", ")}")
+val mutuallyExclusiveOpts: Set[OptionSpec[_]] = Set(membersOpt, 
offsetsOpt, stateOpt)
+if (mutuallyExclusiveOpts.toList.map(o => if (options.has(o)) 1 else 
0).sum > 1) {
+  CommandLineUtils.printUsageAndDie(parser,
+s"Option $describeOpt takes at most one of these options: 
$mutuallyExclusiveOpts")

Review comment:
We should build a string here: `mutuallyExclusiveOpts..mkString(", ")`

##
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##
@@ -1553,6 +1554,50 @@ class KafkaApisTest {
 assertEquals(Errors.INVALID_REQUEST, response.error)
   }
 
+  @Test
+  def testListGroupsRequest(): Unit = {
+val overviews = List(
+  GroupOverview("group1", "protocol1", "Stable"),
+  GroupOverview("goupp2", "qwerty", "Empty")
+)
+val response = listGroupRequest(Option.empty, overviews)
+assertEquals(2, response.data.groups.size)
+assertEquals("", response.data.groups.get(0).groupState)
+assertEquals("", response.data.groups.get(1).groupState)

Review comment:
   When I see this, I do wonder if it wouldn't be better to make the 
`GroupState` field in the response `nullable` and to set it to `null` when the 
state is not provided. Having to handle empty stings is a bit annoying. What do 
you think?

##
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##
@@ -1553,6 +1554,50 @@ class KafkaApisTest {
 assertEquals(Errors.INVALID_REQUEST, response.error)
   }
 
+  @Test
+  def testListGroupsRequest(): Unit = {
+val overviews = List(
+  GroupOverview("group1", "protocol1", "Stable"),
+  GroupOverview("goupp2", "qwerty", "Empty")
+)
+val response = listGroupRequest(Option.empty, overviews)
+assertEquals(2, response.data.groups.size)
+assertEquals("", 

[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-04-21 Thread GitBox


avalsa commented on a change in pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#discussion_r412178952



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
##
@@ -179,6 +179,44 @@ public void subscribe(final Pattern topics, final 
ConsumerRebalanceListener list
 
 TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
 
+CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
+}
+
+@Test
+public void testRegexRecordsAreProcessedAfterReassignment() throws 
Exception {

Review comment:
   Yes, it fails on trunk.
   I found reason: here I tried to isolate tests: so we can create same topic 
in one test, delete it after we are done, do the same in next test, so on. But 
after each test it calls `streams.close()` in `teardown` method. I guess It 
tries to do smth with removed topics and gets `TimeoutException` because can't 
do it. may be it's issue but probably it happens only when close. But it seems 
to be not related to this task. @abbccdda What do you think? I fixed it easily 
(may be not best approach): call `streams.close` in test method body before 
deleting topics. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9617) Replica Fetcher can mark partition as failed when max.message.bytes is changed

2020-04-21 Thread Stanislav Kozlovski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9617?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088692#comment-17088692
 ] 

Stanislav Kozlovski commented on KAFKA-9617:


Feel free to assign yourself [~showuon]!

> Replica Fetcher can mark partition as failed when max.message.bytes is changed
> --
>
> Key: KAFKA-9617
> URL: https://issues.apache.org/jira/browse/KAFKA-9617
> Project: Kafka
>  Issue Type: Bug
>Reporter: Stanislav Kozlovski
>Priority: Major
>  Labels: newbie
>
> There exists a race condition when changing the dynamic max.message.bytes 
> config for a topic. A follower replica can replicate a message that is over 
> that size after it processes the config change. When this happens, the 
> replica fetcher catches the unexpected exception, marks the partition as 
> failed and stops replicating it.
> {code:java}
> 06:38:46.596  Processing override for entityPath: topics/partition-1 with 
> config: Map(max.message.bytes -> 512)
> 06:38:46.597   [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] 
> Unexpected error occurred while processing data for partition partition-1 at 
> offset 20964
> org.apache.kafka.common.errors.RecordTooLargeException: The record batch size 
> in the append to partition-1 is 3349 bytes which exceeds the maximum 
> configured value of 512.
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-04-21 Thread GitBox


avalsa commented on a change in pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#discussion_r412178952



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
##
@@ -179,6 +179,44 @@ public void subscribe(final Pattern topics, final 
ConsumerRebalanceListener list
 
 TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
 
+CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
+}
+
+@Test
+public void testRegexRecordsAreProcessedAfterReassignment() throws 
Exception {

Review comment:
   I found reason: here I tried to isolate tests: so we can create same 
topic in one test, delete it after we are done, do the same in next test, so 
on. But after each test it calls `streams.close()` in `teardown` method. I 
guess It tries to do smth with removed topics and gets `TimeoutException` 
because can't do it. may be it's issue but probably it happens only when close. 
But it seems to be not related to this task. @abbccdda What do you think? I 
fixed it easily (may be not best approach): call `streams.close` in test method 
body before deleting topics. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-04-21 Thread GitBox


avalsa commented on a change in pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#discussion_r412178952



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
##
@@ -179,6 +179,44 @@ public void subscribe(final Pattern topics, final 
ConsumerRebalanceListener list
 
 TestUtils.waitForCondition(() -> 
assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
 
+CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
+}
+
+@Test
+public void testRegexRecordsAreProcessedAfterReassignment() throws 
Exception {

Review comment:
   I found reason: here I tried to isolate tests: so we can create same 
topic in one test, delete it after we are done, do the same in next test, so 
on. But after each test it calls `streams.close()` in `teardown` method. I 
guess It tries to do smth with removed topics and gets `TimeoutException` 
because can't do it. may be it's issue but probably it happens only when close. 
But it seems to be not related to this task. @abbccdda What do you think?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9850) Move KStream#repartition operator validation during Topology build process

2020-04-21 Thread HaiyuanZhao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088647#comment-17088647
 ] 

HaiyuanZhao commented on KAFKA-9850:


[~bchen225242]

I want to pick this up. Can u assign this to me? :)

> Move KStream#repartition operator validation during Topology build process 
> ---
>
> Key: KAFKA-9850
> URL: https://issues.apache.org/jira/browse/KAFKA-9850
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Levani Kokhreidze
>Priority: Major
>  Labels: help-wanted, newbie, newbie++
>
> `KStream#repartition` operation performs most of its validation regarding 
> joining, co-partitioning, etc after starting Kafka Streams instance. Some 
> parts of this validation can be detected much earlier, specifically during 
> topology `build()`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on issue #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch

2020-04-21 Thread GitBox


dajac commented on issue #8509:
URL: https://github.com/apache/kafka/pull/8509#issuecomment-617137684


   @apovzner Yeah, you're right. It does not increase the overall coverage.
   
   I spent quite some time in the `KafkaApis` recently and I noticed that its 
unit test coverage is quite low. If not mistaken, we have literally zero unit 
tests for the `LeaderAndIsr` and `UpdateMetadata` requests over there. I 
believe that having a reasonable unit tests coverage will help us to catch 
regression earlier in the development process. When one touches the 
`KafkaApis`, the first reflex is to run its unit tests to verify that things 
are not broken. 
   
   I do agree that adding a test which verifies that `IllegalStateException` is 
not thrown does not bring much. What do you think about adding few unit tests 
which verify respectively that a stale broker epoch is rejected and a valid 
broker epoch is accepted for the `LeaderAndIsr`, the `StopReplica` and the 
`UpdateMetadata` requests?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-04-21 Thread GitBox


avalsa commented on a change in pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#discussion_r412099165



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##
@@ -115,7 +115,7 @@ public void suspend() {
 }
 
 @Override
-public void resume() {
+public void resume(final boolean requiresUpdate) {

Review comment:
   agree. beautified it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-21 Thread GitBox


cadonna commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r412047332



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ValidClientsByTaskLoadQueue.java
##
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import org.apache.kafka.streams.processor.TaskId;
+
+/**
+ * Wraps a priority queue of clients and returns the next valid candidate(s) 
based on the current task assignment
+ */
+class ValidClientsByTaskLoadQueue {
+
+private final PriorityQueue clientsByTaskLoad;
+private final BiFunction validClientCriteria;
+private final Set uniqueClients = new HashSet<>();
+
+ValidClientsByTaskLoadQueue(final Map clientStates,
+final BiFunction 
validClientCriteria) {
+this.validClientCriteria = validClientCriteria;
+
+clientsByTaskLoad = new PriorityQueue<>(
+(client, other) -> {
+final double clientTaskLoad = 
clientStates.get(client).taskLoad();
+final double otherTaskLoad = 
clientStates.get(other).taskLoad();
+if (clientTaskLoad < otherTaskLoad) {
+return -1;
+} else if (clientTaskLoad > otherTaskLoad) {
+return 1;
+} else {
+return client.compareTo(other);
+}
+});
+}
+
+/**
+ * @return the next least loaded client that satisfies the given criteria, 
or null if none do
+ */
+UUID poll(final TaskId task) {
+final List validClient = poll(task, 1);
+return validClient.isEmpty() ? null : validClient.get(0);
+}
+
+/**
+ * @return the next N <= {@code numClientsPerTask} clients in the 
underlying priority queue that are valid candidates for the given task
+ */
+List poll(final TaskId task, final int numClients) {
+final List nextLeastLoadedValidClients = new LinkedList<>();
+final Set invalidPolledClients = new HashSet<>();
+while (nextLeastLoadedValidClients.size() < numClients) {
+UUID candidateClient;
+while (true) {
+candidateClient = clientsByTaskLoad.poll();
+if (candidateClient == null) {
+offerAll(invalidPolledClients);
+return nextLeastLoadedValidClients;
+}
+
+if (validClientCriteria.apply(candidateClient, task)) {
+nextLeastLoadedValidClients.add(candidateClient);
+break;
+} else {
+invalidPolledClients.add(candidateClient);
+}
+}
+}
+offerAll(invalidPolledClients);
+return nextLeastLoadedValidClients;
+}
+
+void offerAll(final Collection clients) {
+for (final UUID client : clients) {
+offer(client);
+}
+}
+
+void offer(final UUID client) {
+if (uniqueClients.contains(client)) {
+clientsByTaskLoad.remove(client);
+}
+clientsByTaskLoad.offer(client);
+uniqueClients.add(client);

Review comment:
   prop: I would not add the client if it is already contained in the set.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,103 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClientOrNoCaughtUpClientsExist;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import 

[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-04-21 Thread GitBox


avalsa commented on a change in pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#discussion_r412055008



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -305,6 +297,12 @@ public void resume() {
 default:
 throw new IllegalStateException("Illegal state " + state() + " 
while resuming active task " + id);
 }
+if (requiresUpdate) {
+partitionGroup.updatePartitions(inputPartitions(), 
recordQueueCreator::createQueue);
+if (state() != State.RESTORING) { // if task is RESTORING then 
topology will be initialized in completeRestoration

Review comment:
   when work on this task it fails some test that ensure that 
initializeTopology called once here. I thought that it might be important and 
decided to support this.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-04-21 Thread GitBox


avalsa commented on a change in pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#discussion_r412049922



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##
@@ -1127,7 +1127,7 @@ public void 
shouldReadCommittedOffsetAndRethrowTimeoutWhenCompleteRestoration()
 }
 
 @Test
-public void shouldNotReInitializeTopologyWhenResuming() throws IOException 
{
+public void shouldNotReInitializeTopologyWhenResumingWithFalseFlag() 
throws IOException {

Review comment:
   fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on issue #8311: KAFKA-9434: automated protocol for alterReplicaLogDirs

2020-04-21 Thread GitBox


tombentley commented on issue #8311:
URL: https://github.com/apache/kafka/pull/8311#issuecomment-617069226


   @dajac thanks for the review, I've addressed all your comments. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-04-21 Thread GitBox


avalsa commented on a change in pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#discussion_r412020510



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##
@@ -93,6 +96,29 @@ long partitionTimestamp(final TopicPartition partition) {
 return queue.partitionTime();
 }
 
+// creates queues for new partitions, removes old queues, saves cached 
records for previously assigned partitions
+void updatePartitions(final Set newInputPartitions, final 
Function recordQueueCreator) {
+final Set removedPartitions = new HashSet<>();
+final Iterator> queuesIterator 
= partitionQueues.entrySet().iterator();
+while (queuesIterator.hasNext()) {
+final Map.Entry queueEntry = 
queuesIterator.next();
+final TopicPartition topicPartition = queueEntry.getKey();
+if (newInputPartitions.contains(topicPartition)) {

Review comment:
   Yes, can rephrase as you offer.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-04-21 Thread GitBox


avalsa commented on a change in pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#discussion_r412020510



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##
@@ -93,6 +96,29 @@ long partitionTimestamp(final TopicPartition partition) {
 return queue.partitionTime();
 }
 
+// creates queues for new partitions, removes old queues, saves cached 
records for previously assigned partitions
+void updatePartitions(final Set newInputPartitions, final 
Function recordQueueCreator) {
+final Set removedPartitions = new HashSet<>();
+final Iterator> queuesIterator 
= partitionQueues.entrySet().iterator();
+while (queuesIterator.hasNext()) {
+final Map.Entry queueEntry = 
queuesIterator.next();
+final TopicPartition topicPartition = queueEntry.getKey();
+if (newInputPartitions.contains(topicPartition)) {

Review comment:
   Yes, can rephrased as you offer.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] avalsa commented on a change in pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-04-21 Thread GitBox


avalsa commented on a change in pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#discussion_r412015236



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
##
@@ -50,7 +53,7 @@
  */
 public class PartitionGroup {
 
-private final Map partitionQueues;
+private Map partitionQueues;

Review comment:
   agree





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #8524: KAFKA-9866: Avoid election for topics where preferred leader is not in ISR

2020-04-21 Thread GitBox


stanislavkozlovski commented on a change in pull request #8524:
URL: https://github.com/apache/kafka/pull/8524#discussion_r412014026



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1066,6 +1066,7 @@ class KafkaController(val config: KafkaConfig,
 // do this check only if the broker is live and there are no 
partitions being reassigned currently
 // and preferred replica election is not in progress
 val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp 
=> controllerContext.isReplicaOnline(leaderBroker, tp) &&
+  
controllerContext.partitionLeadershipInfo(tp).leaderAndIsr.isr.contains(leaderBroker)
 &&

Review comment:
   I think we should do this check last. We also want to use an option to 
avoid any potential NPEs. e.g:
   ```
   controllerContext.partitionLeadershipInfo.get(partition).forall(...)
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac opened a new pull request #8525: [WIP] KAFKA-9885; Evict last members of a group when the maximum allowed is reached

2020-04-21 Thread GitBox


dajac opened a new pull request #8525:
URL: https://github.com/apache/kafka/pull/8525


   WIP
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-9617) Replica Fetcher can mark partition as failed when max.message.bytes is changed

2020-04-21 Thread Wang Ge (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wang Ge reassigned KAFKA-9617:
--

Assignee: (was: Wang Ge)

> Replica Fetcher can mark partition as failed when max.message.bytes is changed
> --
>
> Key: KAFKA-9617
> URL: https://issues.apache.org/jira/browse/KAFKA-9617
> Project: Kafka
>  Issue Type: Bug
>Reporter: Stanislav Kozlovski
>Priority: Major
>  Labels: newbie
>
> There exists a race condition when changing the dynamic max.message.bytes 
> config for a topic. A follower replica can replicate a message that is over 
> that size after it processes the config change. When this happens, the 
> replica fetcher catches the unexpected exception, marks the partition as 
> failed and stops replicating it.
> {code:java}
> 06:38:46.596  Processing override for entityPath: topics/partition-1 with 
> config: Map(max.message.bytes -> 512)
> 06:38:46.597   [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] 
> Unexpected error occurred while processing data for partition partition-1 at 
> offset 20964
> org.apache.kafka.common.errors.RecordTooLargeException: The record batch size 
> in the append to partition-1 is 3349 bytes which exceeds the maximum 
> configured value of 512.
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9617) Replica Fetcher can mark partition as failed when max.message.bytes is changed

2020-04-21 Thread Wang Ge (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wang Ge reassigned KAFKA-9617:
--

Assignee: Wang Ge

> Replica Fetcher can mark partition as failed when max.message.bytes is changed
> --
>
> Key: KAFKA-9617
> URL: https://issues.apache.org/jira/browse/KAFKA-9617
> Project: Kafka
>  Issue Type: Bug
>Reporter: Stanislav Kozlovski
>Assignee: Wang Ge
>Priority: Major
>  Labels: newbie
>
> There exists a race condition when changing the dynamic max.message.bytes 
> config for a topic. A follower replica can replicate a message that is over 
> that size after it processes the config change. When this happens, the 
> replica fetcher catches the unexpected exception, marks the partition as 
> failed and stops replicating it.
> {code:java}
> 06:38:46.596  Processing override for entityPath: topics/partition-1 with 
> config: Map(max.message.bytes -> 512)
> 06:38:46.597   [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] 
> Unexpected error occurred while processing data for partition partition-1 at 
> offset 20964
> org.apache.kafka.common.errors.RecordTooLargeException: The record batch size 
> in the append to partition-1 is 3349 bytes which exceeds the maximum 
> configured value of 512.
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] LiamClarkeNZ commented on a change in pull request #8520: Add explicit grace period to tumbling window example

2020-04-21 Thread GitBox


LiamClarkeNZ commented on a change in pull request #8520:
URL: https://github.com/apache/kafka/pull/8520#discussion_r412007274



##
File path: docs/streams/developer-guide/dsl-api.html
##
@@ -3262,12 +3262,15 @@ KTable-KTable 
Foreign-Key
 import org.apache.kafka.streams.kstream.TimeWindows;
 
 // A tumbling time window with a size of 5 minutes (and, by 
definition, an implicit
-// advance interval of 5 minutes).
+// advance interval of 5 minutes). Note the explicit grace 
period, as the current
+// default value is 24 hours, which may be larger than needed 
for smaller windows. 
+// Note that this default may change in future major version 
releases.

Review comment:
   Kia ora @vvcephei I have removed the comment in my latest commit :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8924) Default grace period (-1) of TimeWindows causes suppress to emit events after 24h

2020-04-21 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088442#comment-17088442
 ] 

Bruno Cadonna commented on KAFKA-8924:
--

[~vvcephei] I like your proposal

{code:java}
public static TimeWindows ofSize(final Duration size)
{code}

because it makes the meaning of the method more explicit. 

I am undecided about

{code:java}
public static TimeWindows ofSizeAndGracePeriod(final Duration size, final 
Duration grace)
{code}

I see your point, but I find the API more elegant (i.e. better readable) with 
two distinct methods for size and grace. But that's my personal opinion and not 
a technical reason.


> Default grace period (-1) of TimeWindows causes suppress to emit events after 
> 24h
> -
>
> Key: KAFKA-8924
> URL: https://issues.apache.org/jira/browse/KAFKA-8924
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Michał
>Assignee: Michał
>Priority: Major
>  Labels: needs-kip
>
> h2. Problem 
> The default creation of TimeWindows, like
> {code}
> TimeWindows.of(ofMillis(xxx))
> {code}
> calls an internal constructor
> {code}
> return new TimeWindows(sizeMs, sizeMs, -1, DEFAULT_RETENTION_MS);
> {code}
> And the *-1* parameter is the default grace period which I think is here for 
> backward compatibility
> {code}
> @SuppressWarnings("deprecation") // continuing to support 
> Windows#maintainMs/segmentInterval in fallback mode
> @Override
> public long gracePeriodMs() {
> // NOTE: in the future, when we remove maintainMs,
> // we should default the grace period to 24h to maintain the default 
> behavior,
> // or we can default to (24h - size) if you want to be super accurate.
> return graceMs != -1 ? graceMs : maintainMs() - size();
> }
> {code}
> The problem is that if you use a TimeWindows with gracePeriod of *-1* 
> together with suppress *untilWindowCloses*, it never emits an event.
> You can check the Suppress tests 
> (SuppressScenarioTest.shouldSupportFinalResultsForTimeWindows), where 
> [~vvcephei] was (maybe) aware of that and all the scenarios specify the 
> gracePeriod.
> I will add a test without it on my branch and it will fail.
> The test: 
> https://github.com/atais/kafka/commit/221a04dc40d636ffe93a0ad95dfc6bcad653f4db
>  
> h2. Now what can be done
> One easy fix would be to change the default value to 0, which works fine for 
> me in my project, however, I am not aware of the impact it would have done 
> due to the changes in the *gracePeriodMs* method mentioned before.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] leonardge opened a new pull request #8524: Avoid starting election for topics where preferred leader is not in s…

2020-04-21 Thread GitBox


leonardge opened a new pull request #8524:
URL: https://github.com/apache/kafka/pull/8524


   …ync.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9781) TimestampConverter / Allow to specify a time zone when converting unix epoch to string

2020-04-21 Thread fml2 (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9781?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

fml2 updated KAFKA-9781:

Description: 
TimestampConverter can convert a unix epoch value (long; number of milliseconds 
since Jan 01 1970 00:00 GMT) to string. However, when doing such conversion, 
the string result depends on the time zone used.

TimestampConverter uses UTC (i.e. GMT) for the conversion and does not allow to 
change it. But I would need this in order to get the date/time representation 
in my local time zone.

So I propose to introduce another config parameter (optional) for 
"target.type=string": *timeZone* (use java name as the value for the 
parameter). If no time zone is specified, UTC should be used, so that the 
change is backwards compatible.

  was:
TimestampConverter can convert a unix epoch value (long; number of milliseconds 
since Jan 01 1970 00:00 GMT) to string. However, when doing such conversion, 
the string result depends on the time zone used.

TimestampConverter uses UTC (i.e. GMT) for the conversion and does not allow to 
change it. But I would need this in order to get the date/time representation 
in my local time zone.

So I propose to introduce another config parameter (optional) for 
"target.type=string": *timeZone* (use java name for that). If no time zone is 
specified, UTC should be used.


> TimestampConverter / Allow to specify a time zone when converting unix epoch 
> to string
> --
>
> Key: KAFKA-9781
> URL: https://issues.apache.org/jira/browse/KAFKA-9781
> Project: Kafka
>  Issue Type: Wish
>  Components: KafkaConnect
>Reporter: fml2
>Priority: Major
>
> TimestampConverter can convert a unix epoch value (long; number of 
> milliseconds since Jan 01 1970 00:00 GMT) to string. However, when doing such 
> conversion, the string result depends on the time zone used.
> TimestampConverter uses UTC (i.e. GMT) for the conversion and does not allow 
> to change it. But I would need this in order to get the date/time 
> representation in my local time zone.
> So I propose to introduce another config parameter (optional) for 
> "target.type=string": *timeZone* (use java name as the value for the 
> parameter). If no time zone is specified, UTC should be used, so that the 
> change is backwards compatible.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9866) Do not attempt to elect preferred leader replicas which are outside ISR

2020-04-21 Thread Wang Ge (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wang Ge reassigned KAFKA-9866:
--

Assignee: Wang Ge

> Do not attempt to elect preferred leader replicas which are outside ISR
> ---
>
> Key: KAFKA-9866
> URL: https://issues.apache.org/jira/browse/KAFKA-9866
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Wang Ge
>Priority: Minor
>
> The controller automatically triggers a preferred leader election every N 
> minutes. It tries to elect all preferred leaders of partitions without doing 
> some basic checks like whether the leader is in sync.
> This leads to a multitude of errors which cause confusion:
> {code:java}
> April 14th 2020, 17:01:11.015 [Controller id=0] Partition TOPIC-9 failed to 
> complete preferred replica leader election to 1. Leader is still 0{code}
> {code:java}
> April 14th 2020, 17:01:11.002 [Controller id=0] Error completing replica 
> leader election (PREFERRED) for partition TOPIC-9
> kafka.common.StateChangeFailedException: Failed to elect leader for partition 
> TOPIC-9 under strategy PreferredReplicaPartitionLeaderElectionStrategy {code}
> It would be better if the Controller filtered out some of these elections, 
> not attempt them at all and maybe log an aggregate INFO-level log



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9895) Truncation request on broker start up may cause OffsetOutOfRangeException

2020-04-21 Thread Boquan Tang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088354#comment-17088354
 ] 

Boquan Tang commented on KAFKA-9895:


[~ijuma] We've only recently upgraded from 2.2.1 to 2.4.0, we would upgrade 
again for newer version in the near future, but to avoid too much of overhead 
we don't do it as often as every Kafka release.
Is this issue documented in another ticket and got fixed in 2.4.1/2.5.0? If so 
please feel free to close the ticket as already fixed.

> Truncation request on broker start up may cause OffsetOutOfRangeException
> -
>
> Key: KAFKA-9895
> URL: https://issues.apache.org/jira/browse/KAFKA-9895
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Boquan Tang
>Priority: Major
>
> We have a 4 broker cluster running version 2.4.0.
> Upon broker restart, we frequently observe issue like this:
> {code}
> [2020-04-20 20:36:37,827] ERROR [ReplicaFetcher replicaId=4, leaderId=1, 
> fetcherId=0] Unexpected error occurred during truncation for topic-name-10 at 
> offset 632111354 (kafka.server.ReplicaFetcherThread)
> org.apache.kafka.common.errors.OffsetOutOfRangeException: Received request 
> for offset 632111355 for partition active-ads-10, but we only have log 
> segments in the range 0 to 632111354.
> {code}
> The partition experiencing this issue seems random. Could we actually ignore 
> this kind of error and not put this partition to offline? From what the error 
> log describes, I think once the start up finishes, and the partition catches 
> up with leader, it should be OK to put it back to ISR. Please help me if I'm 
> understanding it incorrectly.
> This happens after we updated to 2.4.0, so I'm wondering if it has anything 
> to do with this specific version or not.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9895) Truncation request on broker start up may cause OffsetOutOfRangeException

2020-04-21 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17088352#comment-17088352
 ] 

Ismael Juma commented on KAFKA-9895:


Thanks for the report. Is there a reason you are using 2.4.0 instead of 2.4.1 
or 2.5.0? There are some important fixes in the newer versions.

> Truncation request on broker start up may cause OffsetOutOfRangeException
> -
>
> Key: KAFKA-9895
> URL: https://issues.apache.org/jira/browse/KAFKA-9895
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Boquan Tang
>Priority: Major
>
> We have a 4 broker cluster running version 2.4.0.
> Upon broker restart, we frequently observe issue like this:
> {code}
> [2020-04-20 20:36:37,827] ERROR [ReplicaFetcher replicaId=4, leaderId=1, 
> fetcherId=0] Unexpected error occurred during truncation for topic-name-10 at 
> offset 632111354 (kafka.server.ReplicaFetcherThread)
> org.apache.kafka.common.errors.OffsetOutOfRangeException: Received request 
> for offset 632111355 for partition active-ads-10, but we only have log 
> segments in the range 0 to 632111354.
> {code}
> The partition experiencing this issue seems random. Could we actually ignore 
> this kind of error and not put this partition to offline? From what the error 
> log describes, I think once the start up finishes, and the partition catches 
> up with leader, it should be OK to put it back to ISR. Please help me if I'm 
> understanding it incorrectly.
> This happens after we updated to 2.4.0, so I'm wondering if it has anything 
> to do with this specific version or not.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)