[GitHub] [kafka] ijuma merged pull request #9977: MINOR: Update zookeeper to 3.5.9

2021-01-27 Thread GitBox


ijuma merged pull request #9977:
URL: https://github.com/apache/kafka/pull/9977


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9977: MINOR: Update zookeeper to 3.5.9

2021-01-27 Thread GitBox


ijuma commented on pull request #9977:
URL: https://github.com/apache/kafka/pull/9977#issuecomment-768290763


   All JUnit tests passed.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8930) MM2 documentation

2021-01-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17272942#comment-17272942
 ] 

ASF GitHub Bot commented on KAFKA-8930:
---

omkreddy commented on pull request #324:
URL: https://github.com/apache/kafka-site/pull/324#issuecomment-768372236


   We should also add these docs to `kafka/docs` repo



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> MM2 documentation
> -
>
> Key: KAFKA-8930
> URL: https://issues.apache.org/jira/browse/KAFKA-8930
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Ryanne Dolan
>Priority: Minor
>
> Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig 
> classes. Include example usage and example configuration.
> Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and 
> MirrorHeartbeatConnector, including example configuration for running on 
> Connect w/o mm2 driver.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete

2021-01-27 Thread GitBox


jolshan commented on a change in pull request #9684:
URL: https://github.com/apache/kafka/pull/9684#discussion_r565448514



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java
##
@@ -68,6 +69,19 @@ protected CreateTopicsResult(Map> fu
 return futures.get(topic).thenApply(TopicMetadataAndConfig::config);
 }
 
+/**
+ * Returns a future that provides topic ID for the topic when the request 
completes.
+ * 
+ * If broker version doesn't support replication factor in the response, 
throw
+ * {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
+ * If broker returned an error for topic configs, throw appropriate 
exception. For example,
+ * {@link org.apache.kafka.common.errors.TopicAuthorizationException} is 
thrown if user does not
+ * have permission to describe topic configs.
+ */
+public KafkaFuture topicId(String topic) {
+return futures.get(topic).thenApply(TopicMetadataAndConfig::topicId);
+}
+
 /**
  * Returns a future that provides number of partitions in the topic when 
the request completes.

Review comment:
   I thought it was odd too.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete

2021-01-27 Thread GitBox


jolshan commented on a change in pull request #9684:
URL: https://github.com/apache/kafka/pull/9684#discussion_r565448192



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1930,29 +1932,43 @@ class KafkaApis(val requestChannel: RequestChannel,
 val results = new 
DeletableTopicResultCollection(deleteTopicRequest.data.topicNames.size)
 val toDelete = mutable.Set[String]()
 if (!controller.isActive) {
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
 results.add(new DeletableTopicResult()
-  .setName(topic)
+  .setName(topic.name())
+  .setTopicId(topic.topicId())
   .setErrorCode(Errors.NOT_CONTROLLER.code))
   }
   sendResponseCallback(results)
 } else if (!config.deleteTopicEnable) {
   val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST 
else Errors.TOPIC_DELETION_DISABLED
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
 results.add(new DeletableTopicResult()
-  .setName(topic)
+  .setName(topic.name())
+  .setTopicId(topic.topicId())
   .setErrorCode(error.code))
   }
   sendResponseCallback(results)
 } else {
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
+val name = if (topic.topicId().equals(Uuid.ZERO_UUID)) topic.name()
+  else 
controller.controllerContext.topicNames.getOrElse(topic.topicId(), null)

Review comment:
   Good idea.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9964: MINOR: remove duplicate code of serializing auto-generated data

2021-01-27 Thread GitBox


chia7712 commented on a change in pull request #9964:
URL: https://github.com/apache/kafka/pull/9964#discussion_r565287398



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
##
@@ -265,14 +265,7 @@ public ByteBuffer encode() {
 "Should never try to encode a SubscriptionInfo with version [" 
+
 data.version() + "] > LATEST_SUPPORTED_VERSION [" + 
LATEST_SUPPORTED_VERSION + "]"
 );
-} else {
-final ObjectSerializationCache cache = new 
ObjectSerializationCache();
-final ByteBuffer buffer = ByteBuffer.allocate(data.size(cache, 
(short) data.version()));
-final ByteBufferAccessor accessor = new ByteBufferAccessor(buffer);
-data.write(accessor, cache, (short) data.version());
-buffer.rewind();

Review comment:
   In this case, the state of ```flip``` is coincide with ```rewind```. 
This buffer is filled so the ```limit``` is always equal to ```capability``` 
even though ```rewind``` does not reset ```limit```.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on a change in pull request #9738: KAFKA-8744: Update Scala API to give names to processors

2021-01-27 Thread GitBox


bbejeck commented on a change in pull request #9738:
URL: https://github.com/apache/kafka/pull/9738#discussion_r565371239



##
File path: 
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
##
@@ -391,4 +390,57 @@ class KTableTest extends FlatSpec with Matchers with 
TestDriver {
 
 testDriver.close()
   }
+
+  "setting a name on a filter processor" should "pass the name to the 
topology" in {
+val builder = new StreamsBuilder()
+val sourceTopic = "source"
+val sinkTopic = "sink"
+
+val table = builder.stream[String, String](sourceTopic).groupBy((key, _) 
=> key).count()
+table
+  .filter((key, value) => key.equals("a") && value == 1, 
Named.as("my-name"))
+  .toStream
+  .to(sinkTopic)
+
+import scala.jdk.CollectionConverters._
+
+val filterNode = 
builder.build().describe().subtopologies().asScala.toList(1).nodes().asScala.toList(3)
+filterNode.name() shouldBe "my-name"
+  }
+
+  "setting a name on a count processor" should "pass the name to the topology" 
in {
+val builder = new StreamsBuilder()
+val sourceTopic = "source"
+val sinkTopic = "sink"
+
+val table = builder.stream[String, String](sourceTopic).groupBy((key, _) 
=> key).count(Named.as("my-name"))
+table.toStream.to(sinkTopic)
+
+import scala.jdk.CollectionConverters._
+
+val filterNode = 
builder.build().describe().subtopologies().asScala.toList(1).nodes().asScala.toList(1)
+filterNode.name() shouldBe "my-name"

Review comment:
   shouldn't this line verify the `countNode` name?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tang7526 edited a comment on pull request #9939: MINOR: fix @link tag in javadoc

2021-01-27 Thread GitBox


tang7526 edited a comment on pull request #9939:
URL: https://github.com/apache/kafka/pull/9939#issuecomment-768321328


   > @tang7526 How about using import for this issue? Other references in 
Javadocs use import also.
   Done



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mdespriee commented on pull request #9738: KAFKA-8744: Update Scala API to give names to processors

2021-01-27 Thread GitBox


mdespriee commented on pull request #9738:
URL: https://github.com/apache/kafka/pull/9738#issuecomment-768282420


   @bbejeck of course. Just rebased on trunk and added a couple of tests.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-9689) Automatic broker version detection to initialize stream client

2021-01-27 Thread feyman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17272901#comment-17272901
 ] 

feyman edited comment on KAFKA-9689 at 1/27/21, 2:57 PM:
-

The version detection flow leveraging the versioning system is as described in 
the section: Use case: {{group_coordinator}} feature flag in KIP-584.

The code change mainly contains 3 parts:

1) StreamThread should know if itself is leader in the consumer group, if yes, 
it should periodically query the describeFeatures api to see if there are 
feature metadata updates

2) There should be some place to put the feature metadata in the 
MemberMetadata, either in the assignment(userData) or add a new field in the 
MemberMetadata(which involves public interface change). Current implementation 
levrages the assignment.

3) the StreamThread should dynamically switch to the new thread producer 
without affecting the existing tasks that


was (Author: feyman):
The version detection flow leveraging the versioning system is as described in 
the section: Use case: {{group_coordinator}} feature flag in KIP-584.

The code change mainly contains 3 parts:

1) StreamThread should know if itself is leader in the consumer group, if yes, 
it should periodically query the describeFeatures api to see if there are 
feature metadata updates

2) There should be some place to put the feature metadata in the 
MemberMetadata, either in the assignment(userData) or add a new field in the 
MemberMetadata(which involves public interface change). 

3) the StreamThread should dynamically switch to the new thread producer 
without affecting the existing tasks that

> Automatic broker version detection to initialize stream client
> --
>
> Key: KAFKA-9689
> URL: https://issues.apache.org/jira/browse/KAFKA-9689
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Boyang Chen
>Assignee: feyman
>Priority: Major
>
> Eventually we shall deprecate the flag to suppress EOS thread producer 
> feature, instead we take version detection approach on broker to decide which 
> semantic to use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] tombentley commented on pull request #9847: KAFKA-10703: Better handling and doc for config defaults of topics

2021-01-27 Thread GitBox


tombentley commented on pull request #9847:
URL: https://github.com/apache/kafka/pull/9847#issuecomment-768284951


   @chia7712 any chance of a 2nd review here?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming opened a new pull request #9982: MINOR: remove some explicit type argument in generator

2021-01-27 Thread GitBox


dengziming opened a new pull request #9982:
URL: https://github.com/apache/kafka/pull/9982


   *More detailed description of your change*
   From `ArrayList newCollection = new ArrayList< Integer 
>(arrayLength)` to `ArrayList newCollection = new ArrayList< Integer 
>(arrayLength)`
   
   *Summary of testing strategy (including rationale)*
   QA
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8930) MM2 documentation

2021-01-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17272938#comment-17272938
 ] 

ASF GitHub Bot commented on KAFKA-8930:
---

bbejeck merged pull request #324:
URL: https://github.com/apache/kafka-site/pull/324


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> MM2 documentation
> -
>
> Key: KAFKA-8930
> URL: https://issues.apache.org/jira/browse/KAFKA-8930
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Ryanne Dolan
>Priority: Minor
>
> Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig 
> classes. Include example usage and example configuration.
> Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and 
> MirrorHeartbeatConnector, including example configuration for running on 
> Connect w/o mm2 driver.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #9981: MINOR: Upgrade to Scala 2.12.13

2021-01-27 Thread GitBox


chia7712 commented on pull request #9981:
URL: https://github.com/apache/kafka/pull/9981#issuecomment-768294345


   failed tests pass on my local. will trigger QA again



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9689) Automatic broker version detection to initialize stream client

2021-01-27 Thread feyman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17272901#comment-17272901
 ] 

feyman commented on KAFKA-9689:
---

The version detection flow leveraging the versioning system is as described in 
the section: Use case: {{group_coordinator}} feature flag in KIP-584.

The code change mainly contains 3 parts:

1) StreamThread should know if itself is leader in the consumer group, if yes, 
it should periodically query the describeFeatures api to see if there are 
feature metadata updates

2) There should be some place to put the feature metadata in the 
MemberMetadata, either in the assignment(userData) or add a new field in the 
MemberMetadata(which involves public interface change). 

3) the StreamThread should dynamically switch to the new thread producer 
without affecting the existing tasks that

> Automatic broker version detection to initialize stream client
> --
>
> Key: KAFKA-9689
> URL: https://issues.apache.org/jira/browse/KAFKA-9689
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Boyang Chen
>Assignee: feyman
>Priority: Major
>
> Eventually we shall deprecate the flag to suppress EOS thread producer 
> feature, instead we take version detection approach on broker to decide which 
> semantic to use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #9939: MINOR: fix @link tag in javadoc

2021-01-27 Thread GitBox


chia7712 commented on pull request #9939:
URL: https://github.com/apache/kafka/pull/9939#issuecomment-768277706


   @tang7526 How about using import for this issue? Other references in 
Javadocs use import also.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tang7526 commented on pull request #9939: MINOR: fix @link tag in javadoc

2021-01-27 Thread GitBox


tang7526 commented on pull request #9939:
URL: https://github.com/apache/kafka/pull/9939#issuecomment-768321328


   > @tang7526 How about using import for this issue? Other references in 
Javadocs use import also.
   I tried that before and it didn't work.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8930) MM2 documentation

2021-01-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17272977#comment-17272977
 ] 

ASF GitHub Bot commented on KAFKA-8930:
---

bbejeck commented on pull request #324:
URL: https://github.com/apache/kafka-site/pull/324#issuecomment-768404929


   > We should also add these docs to kafka/docs repo
   
   @omkreddy, yes a PR for kafka/docs is coming soon



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> MM2 documentation
> -
>
> Key: KAFKA-8930
> URL: https://issues.apache.org/jira/browse/KAFKA-8930
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Ryanne Dolan
>Priority: Minor
>
> Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig 
> classes. Include example usage and example configuration.
> Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and 
> MirrorHeartbeatConnector, including example configuration for running on 
> Connect w/o mm2 driver.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8930) MM2 documentation

2021-01-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273055#comment-17273055
 ] 

ASF GitHub Bot commented on KAFKA-8930:
---

bbejeck commented on pull request #326:
URL: https://github.com/apache/kafka-site/pull/326#issuecomment-768461055


   ping @miguno for a +1



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> MM2 documentation
> -
>
> Key: KAFKA-8930
> URL: https://issues.apache.org/jira/browse/KAFKA-8930
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Michael G. Noll
>Priority: Minor
>
> Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig 
> classes. Include example usage and example configuration.
> Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and 
> MirrorHeartbeatConnector, including example configuration for running on 
> Connect w/o mm2 driver.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 commented on a change in pull request #9969: MINOR: updated upgrade and architecture for KIP-663, KIP-696, and KIP-671

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9969:
URL: https://github.com/apache/kafka/pull/9969#discussion_r565518232



##
File path: docs/streams/upgrade-guide.html
##
@@ -91,6 +91,29 @@ Streams API
 We extended StreamJoined to include the options 
withLoggingEnabled() and withLoggingDisabled() in
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-689%3A+Extend+%60StreamJoined%60+to+allow+more+store+configs;>KIP-689.
 
+
+We added two new methods to Kafka Streams, namely 
addThread() and removeThread() in
+https://cwiki.apache.org/confluence/x/FDd4CQ;>KIP-663.
+These enabled adding a removing StreamThreads to running KafkaStreams 
client.
+   
+
+We deprecated setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler uncaughtExceptionHandler)
+in favor of setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler)
+in https://cwiki.apache.org/confluence/x/lkN4CQ;>KIP-671.
+The default handler will close the client and the client will transit 
to state ERROR.
+If you implement a custom handler, the new interface allows you to 
return a StreamThreadExceptionResponse,
+which will determine how the application will respond to a thread 
failure.
+
+
+Changes in https://cwiki.apache.org/confluence/x/FDd4CQ;>KIP-663 necessitated 
the KafkaStreams client
+state machine to update, which was done in https://cwiki.apache.org/confluence/x/lCvZCQ;>KIP-696..
+The ERROR state is now terminal with PENDING_ERROR being a 
transitional state where the resources are closing.
+The ERROR state indicates that there something wrong and should not be 
blindly restarted without classifying

Review comment:
   I went with `is something wrong and the KafkaStreams clinet should not 
be blindly restarted`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9979: KAFKA-12238; Implement `DescribeProducers` API from KIP-664

2021-01-27 Thread GitBox


hachikuji commented on a change in pull request #9979:
URL: https://github.com/apache/kafka/pull/9979#discussion_r565517686



##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##
@@ -511,6 +513,9 @@ public void testSerialization() throws Exception {
 checkRequest(createAlterClientQuotasRequest(), true);
 checkErrorResponse(createAlterClientQuotasRequest(), 
unknownServerException, true);
 checkResponse(createAlterClientQuotasResponse(), 0, true);
+checkRequest(createDescribeProducersRequest(), true);
+checkErrorResponse(createDescribeProducersRequest(), 
unknownServerException, true);
+checkResponse(createDescribeProducersResponse(), 0, true);

Review comment:
   Sounds reasonable. I'm also not super fond of this test pattern.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-6223) Please delete old releases from mirroring system

2021-01-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273066#comment-17273066
 ] 

ASF GitHub Bot commented on KAFKA-6223:
---

mimaison merged pull request #322:
URL: https://github.com/apache/kafka-site/pull/322


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Please delete old releases from mirroring system
> 
>
> Key: KAFKA-6223
> URL: https://issues.apache.org/jira/browse/KAFKA-6223
> Project: Kafka
>  Issue Type: Bug
> Environment: https://dist.apache.org/repos/dist/release/kafka/
>Reporter: Sebb
>Assignee: Rajini Sivaram
>Priority: Major
>
> To reduce the load on the ASF mirrors, projects are required to delete old 
> releases [1]
> Please can you remove all non-current releases?
> It's unfair to expect the 3rd party mirrors to carry old releases.
> Note that older releases can still be linked from the download page, but such 
> links should use the archive server at:
> https://archive.apache.org/dist/kafka/
> A suggested process is:
> + Change the download page to use archive.a.o for old releases
> + Delete the corresponding directories from 
> {{https://dist.apache.org/repos/dist/release/kafka/}}
> e.g. {{svn delete https://dist.apache.org/repos/dist/release/kafka/0.8.0}}
> Thanks!
> [1] http://www.apache.org/dev/release.html#when-to-archive



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 opened a new pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 opened a new pull request #9984:
URL: https://github.com/apache/kafka/pull/9984


   add timeout and static group rebalance to remove thread
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] miguno opened a new pull request #9983: KAFKA-8930: MirrorMaker v2 documentation (#324)

2021-01-27 Thread GitBox


miguno opened a new pull request #9983:
URL: https://github.com/apache/kafka/pull/9983


   This adds a new user-facing documentation "Geo-replication (Cross-Cluster 
Data Mirroring)" section to the Kafka Operations documentation that covers 
MirrorMaker v2.
   
   Was already merged to `kafka-site` via 
https://github.com/apache/kafka-site/pull/324.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8930) MM2 documentation

2021-01-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17272987#comment-17272987
 ] 

ASF GitHub Bot commented on KAFKA-8930:
---

miguno commented on pull request #324:
URL: https://github.com/apache/kafka-site/pull/324#issuecomment-768416513


   kafka/docs PR is up at https://github.com/apache/kafka/pull/9983



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> MM2 documentation
> -
>
> Key: KAFKA-8930
> URL: https://issues.apache.org/jira/browse/KAFKA-8930
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Ryanne Dolan
>Priority: Minor
>
> Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig 
> classes. Include example usage and example configuration.
> Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and 
> MirrorHeartbeatConnector, including example configuration for running on 
> Connect w/o mm2 driver.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8930) MM2 documentation

2021-01-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273030#comment-17273030
 ] 

ASF GitHub Bot commented on KAFKA-8930:
---

bbejeck opened a new pull request #326:
URL: https://github.com/apache/kafka-site/pull/326


   The MM2 docs are already in for 2.7 via 
https://github.com/apache/kafka-site/pull/324, this PR adds them to 2.6



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> MM2 documentation
> -
>
> Key: KAFKA-8930
> URL: https://issues.apache.org/jira/browse/KAFKA-8930
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Michael G. Noll
>Priority: Minor
>
> Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig 
> classes. Include example usage and example configuration.
> Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and 
> MirrorHeartbeatConnector, including example configuration for running on 
> Connect w/o mm2 driver.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete

2021-01-27 Thread GitBox


jolshan commented on a change in pull request #9684:
URL: https://github.com/apache/kafka/pull/9684#discussion_r565591713



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1930,29 +1932,43 @@ class KafkaApis(val requestChannel: RequestChannel,
 val results = new 
DeletableTopicResultCollection(deleteTopicRequest.data.topicNames.size)
 val toDelete = mutable.Set[String]()
 if (!controller.isActive) {
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
 results.add(new DeletableTopicResult()
-  .setName(topic)
+  .setName(topic.name())
+  .setTopicId(topic.topicId())
   .setErrorCode(Errors.NOT_CONTROLLER.code))
   }
   sendResponseCallback(results)
 } else if (!config.deleteTopicEnable) {
   val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST 
else Errors.TOPIC_DELETION_DISABLED
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
 results.add(new DeletableTopicResult()
-  .setName(topic)
+  .setName(topic.name())
+  .setTopicId(topic.topicId())
   .setErrorCode(error.code))
   }
   sendResponseCallback(results)
 } else {
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
+val name = if (topic.topicId().equals(Uuid.ZERO_UUID)) topic.name()
+  else 
controller.controllerContext.topicNames.getOrElse(topic.topicId(), null)
 results.add(new DeletableTopicResult()
-  .setName(topic))
+  .setName(name)
+  .setTopicId(topic.topicId()))
   }
   val authorizedTopics = authHelper.filterByAuthorized(request.context, 
DELETE, TOPIC,
 results.asScala)(_.name)
   results.forEach { topic =>
- if (!authorizedTopics.contains(topic.name))
+ val foundTopicId = !topic.topicId().equals(Uuid.ZERO_UUID) && 
topic.name() != null
+ val topicIdSpecified = !topic.topicId().equals(Uuid.ZERO_UUID)

Review comment:
   Thinking on this more, I can simplify the line. If topic name is null, 
then we didn't have a valid topic ID.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9836: KAFKA-10866: Add metadata to ConsumerRecords

2021-01-27 Thread GitBox


vvcephei commented on pull request #9836:
URL: https://github.com/apache/kafka/pull/9836#issuecomment-768546193


   Most of those failures were known flaky tests, but one was an EasyMock 
error. I'm not able to repro it locally after a rebase, though. Rebased, 
pushed, and trying one more time to get a clean build.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9983: KAFKA-8930: MirrorMaker v2 documentation (#324)

2021-01-27 Thread GitBox


bbejeck commented on pull request #9983:
URL: https://github.com/apache/kafka/pull/9983#issuecomment-768429896


   cherry-picked to 2.7 and 2.6



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9107: KAFKA-5488: Add type-safe split() operator

2021-01-27 Thread GitBox


vvcephei commented on a change in pull request #9107:
URL: https://github.com/apache/kafka/pull/9107#discussion_r565519066



##
File path: streams/src/main/java/org/apache/kafka/streams/kstream/Branched.java
##
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import java.util.Objects;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * The {@code Branched} class is used to define the optional parameters when 
building branches with
+ * {@link BranchedKStream}.
+ *
+ * @param  type of record key
+ * @param  type of record value
+ */
+public class Branched implements NamedOperation> {
+
+protected final String name;
+protected final Function, ? extends KStream> 
chainFunction;
+protected final Consumer> chainConsumer;
+
+protected Branched(final String name,
+   final Function, ? extends 
KStream> chainFunction,
+   final Consumer> chainConsumer) {
+this.name = name;
+this.chainFunction = chainFunction;
+this.chainConsumer = chainConsumer;
+}
+
+/**
+ * Create an instance of {@code Branched} with provided branch name suffix.
+ *
+ * @param name the branch name suffix to be used. If {@code null}, a 
default branch name suffix will be generated
+ * (see {@link BranchedKStream} description for details)
+ * @param   key type
+ * @param   value type
+ * @return a new instance of {@code Branched}
+ */
+public static  Branched as(final String name) {
+return new Branched<>(name, null, null);

Review comment:
   I agree, it seems like a good idea to check for `null` here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8930) MM2 documentation

2021-01-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273059#comment-17273059
 ] 

ASF GitHub Bot commented on KAFKA-8930:
---

miguno commented on pull request #326:
URL: https://github.com/apache/kafka-site/pull/326#issuecomment-768466468


   This LGTM, though (1) there were some minor HTML changes not directly 
related to the original PR and (2) I didn't test this PR locally myself to 
ensure proper HTML rendering etc.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> MM2 documentation
> -
>
> Key: KAFKA-8930
> URL: https://issues.apache.org/jira/browse/KAFKA-8930
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Michael G. Noll
>Priority: Minor
>
> Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig 
> classes. Include example usage and example configuration.
> Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and 
> MirrorHeartbeatConnector, including example configuration for running on 
> Connect w/o mm2 driver.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8930) MM2 documentation

2021-01-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273073#comment-17273073
 ] 

ASF GitHub Bot commented on KAFKA-8930:
---

bbejeck commented on pull request #326:
URL: https://github.com/apache/kafka-site/pull/326#issuecomment-768481852


   > I didn't test this PR locally myself to ensure proper HTML rendering etc.
   FWIW I rendered it locally and it seemed fine



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> MM2 documentation
> -
>
> Key: KAFKA-8930
> URL: https://issues.apache.org/jira/browse/KAFKA-8930
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Michael G. Noll
>Priority: Minor
>
> Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig 
> classes. Include example usage and example configuration.
> Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and 
> MirrorHeartbeatConnector, including example configuration for running on 
> Connect w/o mm2 driver.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete

2021-01-27 Thread GitBox


jolshan commented on a change in pull request #9684:
URL: https://github.com/apache/kafka/pull/9684#discussion_r565591713



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1930,29 +1932,43 @@ class KafkaApis(val requestChannel: RequestChannel,
 val results = new 
DeletableTopicResultCollection(deleteTopicRequest.data.topicNames.size)
 val toDelete = mutable.Set[String]()
 if (!controller.isActive) {
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
 results.add(new DeletableTopicResult()
-  .setName(topic)
+  .setName(topic.name())
+  .setTopicId(topic.topicId())
   .setErrorCode(Errors.NOT_CONTROLLER.code))
   }
   sendResponseCallback(results)
 } else if (!config.deleteTopicEnable) {
   val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST 
else Errors.TOPIC_DELETION_DISABLED
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
 results.add(new DeletableTopicResult()
-  .setName(topic)
+  .setName(topic.name())
+  .setTopicId(topic.topicId())
   .setErrorCode(error.code))
   }
   sendResponseCallback(results)
 } else {
-  deleteTopicRequest.data.topicNames.forEach { topic =>
+  deleteTopicRequest.topics().forEach { topic =>
+val name = if (topic.topicId().equals(Uuid.ZERO_UUID)) topic.name()
+  else 
controller.controllerContext.topicNames.getOrElse(topic.topicId(), null)
 results.add(new DeletableTopicResult()
-  .setName(topic))
+  .setName(name)
+  .setTopicId(topic.topicId()))
   }
   val authorizedTopics = authHelper.filterByAuthorized(request.context, 
DELETE, TOPIC,
 results.asScala)(_.name)
   results.forEach { topic =>
- if (!authorizedTopics.contains(topic.name))
+ val foundTopicId = !topic.topicId().equals(Uuid.ZERO_UUID) && 
topic.name() != null
+ val topicIdSpecified = !topic.topicId().equals(Uuid.ZERO_UUID)

Review comment:
   Thinking on this more, I can simplify the line. Especially if I make 
changes with the code above.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9983: KAFKA-8930: MirrorMaker v2 documentation (#324)

2021-01-27 Thread GitBox


bbejeck commented on pull request #9983:
URL: https://github.com/apache/kafka/pull/9983#issuecomment-768420998


   merged #9983 into trunk



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck merged pull request #9983: KAFKA-8930: MirrorMaker v2 documentation (#324)

2021-01-27 Thread GitBox


bbejeck merged pull request #9983:
URL: https://github.com/apache/kafka/pull/9983


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9979: KAFKA-12238; Implement `DescribeProducers` API from KIP-664

2021-01-27 Thread GitBox


hachikuji commented on a change in pull request #9979:
URL: https://github.com/apache/kafka/pull/9979#discussion_r565525057



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -3365,6 +3366,44 @@ class KafkaApis(val requestChannel: RequestChannel,
 }
   }
 
+  def handleDescribeProducersRequest(request: RequestChannel.Request): Unit = {
+val describeProducersRequest = request.body[DescribeProducersRequest]
+
+def partitionError(topicPartition: TopicPartition, error: Errors): 
DescribeProducersResponseData.PartitionResponse = {
+  new DescribeProducersResponseData.PartitionResponse()
+.setPartitionIndex(topicPartition.partition)
+.setErrorCode(error.code)
+}
+
+val response = new DescribeProducersResponseData()
+describeProducersRequest.data.topics.forEach { topicRequest =>
+  val topicResponse = new DescribeProducersResponseData.TopicResponse()
+.setName(topicRequest.name)
+  val topicError = if (!authHelper.authorize(request.context, READ, TOPIC, 
topicRequest.name))
+Some(Errors.TOPIC_AUTHORIZATION_FAILED)
+  else if (!metadataCache.contains(topicRequest.name))
+Some(Errors.UNKNOWN_TOPIC_OR_PARTITION)
+  else
+None
+
+  topicRequest.partitionIndexes.forEach { partitionId =>
+val topicPartition = new TopicPartition(topicRequest.name, partitionId)
+val partitionResponse = topicError match {
+  case Some(error) => partitionError(topicPartition, error)
+  case None => replicaManager.activeProducerState(topicPartition)
+}
+topicResponse.partitions.add(partitionResponse)
+  }
+
+  if (!topicResponse.partitions.isEmpty) {
+response.topics.add(topicResponse)
+  }

Review comment:
   Hmm... It's been a while since I wrote this. I agree it looks a little 
strange. I guess there's probably no harm echoing back the same structure that 
was sent.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9769: KAFKA-10774; Support Describe topic using topic IDs

2021-01-27 Thread GitBox


jolshan commented on a change in pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#discussion_r565575532



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1223,7 +1251,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 Set.empty[MetadataResponseTopic]
   else
 unauthorizedForDescribeTopics.map(topic =>
-  metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, 
false, util.Collections.emptyList()))
+  metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, 
Uuid.ZERO_UUID, false, util.Collections.emptyList()))

Review comment:
   So will we never reach this code path when using topic IDs? I think we 
are using topics to decide authorization. So in the case where we use ids and 
the name exists, then we will expose the name and return a zero ID? Might be 
useful to create an authorizer integration test with topic IDs to ensure 
correctness.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-8930) MM2 documentation

2021-01-27 Thread Michael G. Noll (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael G. Noll reassigned KAFKA-8930:
--

Assignee: Michael G. Noll  (was: Ryanne Dolan)

> MM2 documentation
> -
>
> Key: KAFKA-8930
> URL: https://issues.apache.org/jira/browse/KAFKA-8930
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Michael G. Noll
>Priority: Minor
>
> Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig 
> classes. Include example usage and example configuration.
> Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and 
> MirrorHeartbeatConnector, including example configuration for running on 
> Connect w/o mm2 driver.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #9979: KAFKA-12238; Implement `DescribeProducers` API from KIP-664

2021-01-27 Thread GitBox


hachikuji commented on a change in pull request #9979:
URL: https://github.com/apache/kafka/pull/9979#discussion_r565519812



##
File path: 
clients/src/main/resources/common/message/DescribeProducersResponse.json
##
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 61,
+  "type": "response",
+  "name": "DescribeProducersResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", 
"ignorable": true,
+  "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
+{ "name": "Topics", "type": "[]TopicResponse", "versions": "0+",
+  "about": "Each topic in the response.", "fields": [
+  { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+"about": "The topic name" },
+  { "name": "Partitions", "type": "[]PartitionResponse", "versions": "0+",
+"about": "Each partition in the response.", "fields": [
+{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
+  "about": "The partition index." },
+{ "name": "ErrorCode", "type": "int16", "versions": "0+",
+  "about": "The partition error code, or 0 if there was no error." },

Review comment:
   I don't feel strongly about it. Perhaps it's better to have it than not. 
I will add it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9107: KAFKA-5488: Add type-safe split() operator

2021-01-27 Thread GitBox


vvcephei commented on a change in pull request #9107:
URL: https://github.com/apache/kafka/pull/9107#discussion_r565519912



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/BranchedKStream.java
##
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import java.util.Map;
+
+/**
+ * Branches the records in the original stream based on the predicates 
supplied for the branch definitions.
+ * 
+ * Branches are defined with {@link BranchedKStream#branch(Predicate, 
Branched)} or
+ * {@link BranchedKStream#defaultBranch(Branched)} methods. Each record is 
evaluated against the predicates
+ * supplied via {@link Branched} parameters, and is routed to the first branch 
for which its respective predicate
+ * evaluates to {@code true}. If a record does not match any predicates, it 
will be routed to the default branch,
+ * or dropped if no default branch is created.
+ * 
+ * Each branch (which is a {@link KStream} instance) then can be processed 
either by
+ * a {@link java.util.function.Function} or a {@link 
java.util.function.Consumer} provided via a {@link Branched}
+ * parameter. If certain conditions are met, it also can be accessed from the 
{@link Map} returned by
+ * {@link BranchedKStream#defaultBranch(Branched)} or {@link 
BranchedKStream#noDefaultBranch()}
+ * (see usage examples).
+ * 
+ * The branching happens on first-match: A record in the original stream is 
assigned to the corresponding result

Review comment:
   Yes, I agree, unless you want to add a noun:
   
   ```suggestion
* The branching happens on a first-match basis: A record in the original 
stream is assigned to the corresponding result
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #9872: KAFKA-10759: ARM support for Kafka

2021-01-27 Thread GitBox


junrao commented on a change in pull request #9872:
URL: https://github.com/apache/kafka/pull/9872#discussion_r565473406



##
File path: Jenkinsfile
##
@@ -160,5 +160,23 @@ pipeline {
 }
   }
 }
+stage("Arm Build") {
+  agent { label 'arm4' }
+  options {
+timeout(time: 8, unit: 'HOURS')
+timestamps()
+  }
+  environment {

Review comment:
   Do we need to specify the jdk version since scala 2.12 only works on jdk 
8?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mdespriee commented on a change in pull request #9738: KAFKA-8744: Update Scala API to give names to processors

2021-01-27 Thread GitBox


mdespriee commented on a change in pull request #9738:
URL: https://github.com/apache/kafka/pull/9738#discussion_r565497044



##
File path: 
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KTableTest.scala
##
@@ -391,4 +390,57 @@ class KTableTest extends FlatSpec with Matchers with 
TestDriver {
 
 testDriver.close()
   }
+
+  "setting a name on a filter processor" should "pass the name to the 
topology" in {
+val builder = new StreamsBuilder()
+val sourceTopic = "source"
+val sinkTopic = "sink"
+
+val table = builder.stream[String, String](sourceTopic).groupBy((key, _) 
=> key).count()
+table
+  .filter((key, value) => key.equals("a") && value == 1, 
Named.as("my-name"))
+  .toStream
+  .to(sinkTopic)
+
+import scala.jdk.CollectionConverters._
+
+val filterNode = 
builder.build().describe().subtopologies().asScala.toList(1).nodes().asScala.toList(3)
+filterNode.name() shouldBe "my-name"
+  }
+
+  "setting a name on a count processor" should "pass the name to the topology" 
in {
+val builder = new StreamsBuilder()
+val sourceTopic = "source"
+val sinkTopic = "sink"
+
+val table = builder.stream[String, String](sourceTopic).groupBy((key, _) 
=> key).count(Named.as("my-name"))
+table.toStream.to(sinkTopic)
+
+import scala.jdk.CollectionConverters._
+
+val filterNode = 
builder.build().describe().subtopologies().asScala.toList(1).nodes().asScala.toList(1)
+filterNode.name() shouldBe "my-name"

Review comment:
   bad copy-paste of variable name.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9969: MINOR: updated upgrade and architecture for KIP-663, KIP-696, and KIP-671

2021-01-27 Thread GitBox


mjsax commented on a change in pull request #9969:
URL: https://github.com/apache/kafka/pull/9969#discussion_r565515280



##
File path: docs/streams/upgrade-guide.html
##
@@ -91,6 +91,29 @@ Streams API
 We extended StreamJoined to include the options 
withLoggingEnabled() and withLoggingDisabled() in
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-689%3A+Extend+%60StreamJoined%60+to+allow+more+store+configs;>KIP-689.
 
+
+We added two new methods to Kafka Streams, namely 
addThread() and removeThread() in
+https://cwiki.apache.org/confluence/x/FDd4CQ;>KIP-663.
+These enabled adding a removing StreamThreads to running KafkaStreams 
client.
+   
+
+We deprecated setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler uncaughtExceptionHandler)
+in favor of setUncaughtExceptionHandler(final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler)
+in https://cwiki.apache.org/confluence/x/lkN4CQ;>KIP-671.
+The default handler will close the client and the client will transit 
to state ERROR.
+If you implement a custom handler, the new interface allows you to 
return a StreamThreadExceptionResponse,
+which will determine how the application will respond to a thread 
failure.
+
+
+Changes in https://cwiki.apache.org/confluence/x/FDd4CQ;>KIP-663 necessitated 
the KafkaStreams client
+state machine to update, which was done in https://cwiki.apache.org/confluence/x/lCvZCQ;>KIP-696..
+The ERROR state is now terminal with PENDING_ERROR being a 
transitional state where the resources are closing.
+The ERROR state indicates that there something wrong and should not be 
blindly restarted without classifying

Review comment:
   `that there something` -> `that there [is] something` ?
   
   `and should not be blindly restarted` -> `and you should not restarted 
KafkaStreams blindly` ? (or `and [KafkaStreams] should not be blindly 
restarted`) ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9107: KAFKA-5488: Add type-safe split() operator

2021-01-27 Thread GitBox


mjsax commented on pull request #9107:
URL: https://github.com/apache/kafka/pull/9107#issuecomment-768478775


   @inponomarev the failing tests seems to be due to a known issue that was 
fixed via https://github.com/apache/kafka/pull/9768
   
   Can you rebase your PR to pickup the fix so we can get a green build?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-8930) MM2 documentation

2021-01-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273075#comment-17273075
 ] 

ASF GitHub Bot commented on KAFKA-8930:
---

bbejeck merged pull request #326:
URL: https://github.com/apache/kafka-site/pull/326


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> MM2 documentation
> -
>
> Key: KAFKA-8930
> URL: https://issues.apache.org/jira/browse/KAFKA-8930
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Michael G. Noll
>Priority: Minor
>
> Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig 
> classes. Include example usage and example configuration.
> Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and 
> MirrorHeartbeatConnector, including example configuration for running on 
> Connect w/o mm2 driver.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8930) MM2 documentation

2021-01-27 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273074#comment-17273074
 ] 

ASF GitHub Bot commented on KAFKA-8930:
---

bbejeck edited a comment on pull request #326:
URL: https://github.com/apache/kafka-site/pull/326#issuecomment-768481852


   > I didn't test this PR locally myself to ensure proper HTML rendering etc.
   
   FWIW I rendered it locally and it seemed fine



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> MM2 documentation
> -
>
> Key: KAFKA-8930
> URL: https://issues.apache.org/jira/browse/KAFKA-8930
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Michael G. Noll
>Priority: Minor
>
> Expand javadocs for new MirrorMaker (entrypoint) and MirrorMakerConfig 
> classes. Include example usage and example configuration.
> Expand javadocs for MirrorSourceConnector, MirrorCheckpointConnector, and 
> MirrorHeartbeatConnector, including example configuration for running on 
> Connect w/o mm2 driver.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on a change in pull request #9769: KAFKA-10774; Support Describe topic using topic IDs

2021-01-27 Thread GitBox


jolshan commented on a change in pull request #9769:
URL: https://github.com/apache/kafka/pull/9769#discussion_r565575532



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1223,7 +1251,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 Set.empty[MetadataResponseTopic]
   else
 unauthorizedForDescribeTopics.map(topic =>
-  metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, 
false, util.Collections.emptyList()))
+  metadataResponseTopic(Errors.TOPIC_AUTHORIZATION_FAILED, topic, 
Uuid.ZERO_UUID, false, util.Collections.emptyList()))

Review comment:
   So will we never reach this code path when using topic IDs? I think we 
are using topics to decide authorization. So in the case where we use ids and 
the name exists, then we will expose the name and return a zero ID?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-27 Thread GitBox


aloknnikhil commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r565673883



##
File path: core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
##
@@ -34,6 +34,7 @@ import scala.collection.mutable
 
 object KafkaNetworkChannel {
 
+  val nonRoutableAddress = new InetSocketAddress("0.0.0.0", 0)

Review comment:
   Yea, good catch. The AddressSpec makes sense.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] aloknnikhil commented on a change in pull request #9985: KAFKA-12237: Support non-routable quorum voter addresses

2021-01-27 Thread GitBox


aloknnikhil commented on a change in pull request #9985:
URL: https://github.com/apache/kafka/pull/9985#discussion_r565673491



##
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##
@@ -36,7 +36,9 @@
 public static final String QUORUM_VOTERS_CONFIG = QUORUM_PREFIX + "voters";
 public static final String QUORUM_VOTERS_DOC = "Map of id/endpoint 
information for " +
 "the set of voters in a comma-separated list of `{id}@{host}:{port}` 
entries. " +
-"For example: `1@localhost:9092,2@localhost:9093,3@localhost:9094`";
+"For example: `1@localhost:9092,2@localhost:9093,3@localhost:9094.`" +
+"If the voter endpoints are not known at startup, a non-routable 
address can be provided instead." +

Review comment:
   Fair enough. I can move it there.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #9901: KAFKA-12209: Add the timeline data structures for the KIP-631 controller

2021-01-27 Thread GitBox


junrao commented on a change in pull request #9901:
URL: https://github.com/apache/kafka/pull/9901#discussion_r564872645



##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.timeline;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * SnapshottableHashTable implements a hash table that supports creating 
point-in-time
+ * snapshots.  Each snapshot is immutable once it is created; the past cannot 
be changed.
+ * We handle divergences between the current state and historical state by 
copying a
+ * reference to elements that have been deleted or overwritten into the 
snapshot tiers
+ * in which they still exist.  Each tier has its own hash table.
+ *
+ * In order to retrieve an object from epoch E, we only have to check two 
tiers: the
+ * current tier, and the tier associated with the snapshot from epoch E.  This 
design
+ * makes snapshot reads a little faster and simpler, at the cost of requiring 
us to copy
+ * references into multiple snapshot tiers sometimes when altering the current 
state.
+ * In general, we don't expect there to be many snapshots at any given point 
in time,
+ * though.  We expect to use about 2 snapshots at most.
+ *
+ * The current tier's data is stored in the fields inherited from 
BaseHashTable.  It
+ * would be conceptually simpler to have a separate BaseHashTable object, but 
since Java
+ * doesn't have value types, subclassing is the only way to avoid another 
pointer
+ * indirection and the associated extra memory cost.
+ *
+ * In contrast, the data for snapshot tiers is stored in the Snapshot object 
itself.
+ * We access it by looking up our object reference in the Snapshot's 
IdentityHashMap.
+ * This design ensures that we can remove snapshots in O(1) time, simply by 
deleting the
+ * Snapshot object from the SnapshotRegistry.
+ *
+ * As mentioned before, an element only exists in a snapshot tier if the 
element was
+ * overwritten or removed from a later tier.  If there are no changes between 
then and
+ * now, there is no data at all stored for the tier.  We don't even store a 
hash table
+ * object for a tier unless there is at least one change between then and now.
+ *
+ * The class hierarchy looks like this:
+ *
+ *Revertable   BaseHashTable
+ *  ↑  ↑
+ *   SnapshottableHashTable → SnapshotRegistry → Snapshot
+ *   ↑ ↑
+ *   TimelineHashSet   TimelineHashMap
+ *
+ * BaseHashTable is a simple hash table that uses separate chaining.  The 
interface is
+ * pretty bare-bones since this class is not intended to be used directly by 
end-users.
+ *
+ * This class, SnapshottableHashTable, has the logic for snapshotting and 
iterating over
+ * snapshots.  This is the core of the snapshotted hash table code and handles 
the
+ * tiering.
+ *
+ * TimelineHashSet and TimelineHashMap are mostly wrappers around this
+ * SnapshottableHashTable class.  They implement standard Java APIs for Set 
and Map,
+ * respectively.  There's a fair amount of boilerplate for this, but it's 
necessary so
+ * that timeline data structures can be used while writing idiomatic Java code.
+ * The accessor APIs have two versions -- one that looks at the current state, 
and one
+ * that looks at a historical snapshotted state.  Mutation APIs only ever 
mutate thte

Review comment:
   typo thte

##
File path: 
metadata/src/main/java/org/apache/kafka/timeline/timeline/SnapshottableHashTable.java
##
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by 

[GitHub] [kafka] ableegoldman commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565720574



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
  * no stream threads are alive
  */
 public Optional removeStreamThread() {
+return removeStreamThread(Long.MAX_VALUE);
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {

Review comment:
   We generally don't explicitly make this part of the API, and just inform 
users through the javadocs as you've done
   ```suggestion
   public Optional removeStreamThread(final Duration timeout) {
   ```

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
  * no stream threads are alive
  */
 public Optional removeStreamThread() {
+return removeStreamThread(Long.MAX_VALUE);
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time

Review comment:
   ```suggestion
* @throws org.apache.kafka.common.errors.TimeoutException if the thread 
does not stop in time
   ```

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -88,9 +91,11 @@
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import org.apache.kafka.common.errors.TimeoutException;

Review comment:
   nit: move the import to the other `o.a.k.*` imports

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1005,11 +1036,28 @@ private StreamThread createAndAddStreamThread(final 
long cacheSizePerThread, fin
 || threads.size() == 1)) {
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");
+}
 }
 threads.remove(streamThread);
 final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
 resizeThreadCache(cacheSizePerThread);
+if (streamThread.getGroupInstanceID().isPresent()) {
+final MemberToRemove memberToRemove = new 
MemberToRemove(streamThread.getGroupInstanceID().get());
+final Collection membersToRemove = 
Collections.singletonList(memberToRemove);
+final RemoveMembersFromConsumerGroupResult 

[GitHub] [kafka] satishd commented on a change in pull request #9980: MINOR: Reduce size of the ProducerStateEntry batchMetadata queue.

2021-01-27 Thread GitBox


satishd commented on a change in pull request #9980:
URL: https://github.com/apache/kafka/pull/9980#discussion_r565730154



##
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##
@@ -63,7 +63,7 @@ private[log] object ProducerStateEntry {
   private[log] val NumBatchesToRetain = 5
 
   def empty(producerId: Long) = new ProducerStateEntry(producerId,
-batchMetadata = mutable.Queue[BatchMetadata](),
+batchMetadata = new mutable.Queue[BatchMetadata](5),

Review comment:
   minor: you may want to have it as `new 
mutable.Queue[BatchMetadata](NumBatchesToRetain)`  instead of harcoding 
directly. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565731661



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -319,9 +319,9 @@ private void prepareStreamThread(final StreamThread thread, 
final boolean termin
 StreamThread.State.PARTITIONS_ASSIGNED);
 return null;
 }).anyTimes();
+
EasyMock.expect(thread.getGroupInstanceID()).andReturn(Optional.empty()).anyTimes();

Review comment:
   ```suggestion
   
EasyMock.expect(thread.getGroupInstanceID()).andStubReturn(Optional.empty());
   ```

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -180,6 +182,19 @@ public void shouldRemoveStreamThread() throws Exception {
 }
 }
 
+@Test
+public void shouldnNotRemoveStreamThreadWithTimeout() throws Exception {
+try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
+addStreamStateChangeListener(kafkaStreams);
+startStreamsAndWaitForRunning(kafkaStreams);
+
+final int oldThreadCount = 
kafkaStreams.localThreadsMetadata().size();
+stateTransitionHistory.clear();
+assertThrows(TimeoutException.class, () -> 
kafkaStreams.removeStreamThread(Duration.ZERO.minus(DEFAULT_DURATION)));

Review comment:
   It's a bit weird to test this by passing in a negative timeout but I 
don't have any good ideas for forcing it to exceed the timeout  

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -180,6 +182,19 @@ public void shouldRemoveStreamThread() throws Exception {
 }
 }
 
+@Test
+public void shouldnNotRemoveStreamThreadWithTimeout() throws Exception {

Review comment:
   ```suggestion
   public void shouldNotRemoveStreamThreadWithinTimeout() throws Exception {
   ```

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
  * no stream threads are alive
  */
 public Optional removeStreamThread() {
+return removeStreamThread(Long.MAX_VALUE);
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {
+final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
+return removeStreamThread(timeoutMs);
+}
+
+private Optional removeStreamThread(final long timeoutMs) throws 
TimeoutException {
+final long begin = time.milliseconds();
 if (isRunningOrRebalancing()) {
 synchronized (changeThreadCount) {
 // make a copy of threads to avoid holding lock
 for (final StreamThread streamThread : new 
ArrayList<>(threads)) {
 if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName())
 || threads.size() == 1)) {
+final Optional groupInstanceID = 
streamThread.getGroupInstanceID();
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");

Review comment:
   Hm actually now that I think about it, we should probably continue with 
the cleanup to leave the 

[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r565746851



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -782,8 +783,27 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
  final Time time) throws StreamsException {
 this.config = config;
 this.time = time;
+
+this.internalTopologyBuilder = internalTopologyBuilder;
+internalTopologyBuilder.rewriteTopology(config);
+
+// sanity check to fail-fast in case we cannot build a 
ProcessorTopology due to an exception
+taskTopology = internalTopologyBuilder.buildTopology();
+globalTaskTopology = 
internalTopologyBuilder.buildGlobalStateTopology();
+
+final boolean hasGlobalTopology = globalTaskTopology != null;
+final boolean hasPersistentStores = 
taskTopology.hasPersistentLocalStore() ||
+(hasGlobalTopology && 
globalTaskTopology.hasPersistentGlobalStore());
+
+try {
+stateDirectory = new StateDirectory(config, time, 
hasPersistentStores);
+processId = stateDirectory.getProcessId();

Review comment:
   This is the only real change in the constructor, but I had to move a few 
things around and tried to organize them as I went





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] skaundinya15 commented on a change in pull request #9589: KAFKA-10710 - Mirror Maker 2 - Create herders only if source->target.enabled=true

2021-01-27 Thread GitBox


skaundinya15 commented on a change in pull request #9589:
URL: https://github.com/apache/kafka/pull/9589#discussion_r565772802



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java
##
@@ -89,11 +89,25 @@ public MirrorMakerConfig(Map props) {
 public List clusterPairs() {
 List pairs = new ArrayList<>();
 Set clusters = clusters();
+Map originalStrings = originalsStrings();
+boolean globalHeartbeatsEnabled = 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED_DEFAULT;
+if 
(originalStrings.containsKey(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+globalHeartbeatsEnabled = 
Boolean.valueOf(originalStrings.get(MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+}
+
 for (String source : clusters) {
 for (String target : clusters) {
-SourceAndTarget sourceAndTarget = new SourceAndTarget(source, 
target);
 if (!source.equals(target)) {
-pairs.add(sourceAndTarget);
+String clusterPairConfigPrefix = source + "->" + target + 
".";
+boolean clusterPairEnabled = 
Boolean.valueOf(originalStrings.getOrDefault(clusterPairConfigPrefix + 
"enabled", "false"));
+boolean clusterPairHeartbeatsEnabled = 
globalHeartbeatsEnabled;
+if (originalStrings.containsKey(clusterPairConfigPrefix + 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED)) {
+clusterPairHeartbeatsEnabled = 
Boolean.valueOf(originalStrings.get(clusterPairConfigPrefix + 
MirrorConnectorConfig.EMIT_HEARTBEATS_ENABLED));
+}
+
+if (clusterPairEnabled || clusterPairHeartbeatsEnabled) {

Review comment:
   ```suggestion
   // By default, all source->target Herder combinations 
are created even if `x->y.enabled=false`
   // Unless `emit.heartbeats.enabled=false` or 
`x->y.emit.heartbeats.enabled=false`
   // Reason for this behavior: for a given replication 
flow A->B with heartbeats, 2 herders are required :
   // B->A for the MirrorHeartbeatConnector (emits 
heartbeats into A for monitoring replication health)
   // A->B for the MirrorSourceConnector (actual 
replication flow)
   if (clusterPairEnabled || clusterPairHeartbeatsEnabled) {
   ```
   Looks good to me, just had a small tweak for the `B->A` comment. Thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gardnervickers commented on a change in pull request #9980: MINOR: Reduce size of the ProducerStateEntry batchMetadata queue.

2021-01-27 Thread GitBox


gardnervickers commented on a change in pull request #9980:
URL: https://github.com/apache/kafka/pull/9980#discussion_r565781027



##
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##
@@ -63,7 +63,7 @@ private[log] object ProducerStateEntry {
   private[log] val NumBatchesToRetain = 5
 
   def empty(producerId: Long) = new ProducerStateEntry(producerId,
-batchMetadata = mutable.Queue[BatchMetadata](),
+batchMetadata = new mutable.Queue[BatchMetadata](5),

Review comment:
   Yes, good suggestion :) 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565784940



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
  * no stream threads are alive
  */
 public Optional removeStreamThread() {
+return removeStreamThread(Long.MAX_VALUE);
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {
+final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
+return removeStreamThread(timeoutMs);
+}
+
+private Optional removeStreamThread(final long timeoutMs) throws 
TimeoutException {
+final long begin = time.milliseconds();
 if (isRunningOrRebalancing()) {
 synchronized (changeThreadCount) {
 // make a copy of threads to avoid holding lock
 for (final StreamThread streamThread : new 
ArrayList<>(threads)) {
 if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName())
 || threads.size() == 1)) {
+final Optional groupInstanceID = 
streamThread.getGroupInstanceID();
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");

Review comment:
   H. That is interesting. I am not sure. If the thread hasn't been 
removed then we don't want to resize the cache so would removing the thread 
then throwing an exception the right way of doing it as the timeout is 
essentially  saying that removing the thread failed. So is it right to then 
remove it anyways?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565784940



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
  * no stream threads are alive
  */
 public Optional removeStreamThread() {
+return removeStreamThread(Long.MAX_VALUE);
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {
+final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
+return removeStreamThread(timeoutMs);
+}
+
+private Optional removeStreamThread(final long timeoutMs) throws 
TimeoutException {
+final long begin = time.milliseconds();
 if (isRunningOrRebalancing()) {
 synchronized (changeThreadCount) {
 // make a copy of threads to avoid holding lock
 for (final StreamThread streamThread : new 
ArrayList<>(threads)) {
 if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName())
 || threads.size() == 1)) {
+final Optional groupInstanceID = 
streamThread.getGroupInstanceID();
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");

Review comment:
   H. That is interesting. I am not sure. If the thread hasn't been 
removed then we don't want to resize the cache. The timeout is essentially 
saying that removing the thread failed. So is it right to then remove it 
anyways? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9967: KAFKA-12236; New meta.properties logic for KIP-500

2021-01-27 Thread GitBox


chia7712 commented on a change in pull request #9967:
URL: https://github.com/apache/kafka/pull/9967#discussion_r565786393



##
File path: core/src/main/scala/kafka/server/Server.scala
##
@@ -46,6 +46,22 @@ object Server {
 new Metrics(metricConfig, reporters, time, true, metricsContext)
   }
 
+  def initializeMetrics(
+config: KafkaConfig,
+time: Time,
+metaProps: MetaProperties

Review comment:
   It seems to me the properties in ```MetaProperties``` is duplicate to 
```KafkaConfig``` in this case. Is there any reason that we need to pass 
```MetaProperties```?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565797234



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
  * no stream threads are alive
  */
 public Optional removeStreamThread() {
+return removeStreamThread(Long.MAX_VALUE);
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {
+final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
+return removeStreamThread(timeoutMs);
+}
+
+private Optional removeStreamThread(final long timeoutMs) throws 
TimeoutException {
+final long begin = time.milliseconds();
 if (isRunningOrRebalancing()) {
 synchronized (changeThreadCount) {
 // make a copy of threads to avoid holding lock
 for (final StreamThread streamThread : new 
ArrayList<>(threads)) {
 if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName())
 || threads.size() == 1)) {
+final Optional groupInstanceID = 
streamThread.getGroupInstanceID();
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");

Review comment:
   It does seem like kind of a gray area. Still, the TimeoutException isn't 
necessarily saying that it failed, just that we didn't wait long enough for it 
to finish the shutdown. But we have at least definitely initiated the shutdown 
-- besides, if the thread really is stuck in its shutdown then it's probably a 
benefit to go ahead with the `removeMembersFromConsumerGroup` call to get it 
kicked out all the sooner.
   
   But, in the end, we really make no guarantees about the application should a 
user choose to ignore the  TimeoutException (though they absolutely can). I can 
imagine that some users might choose to just swallow it and decide that they 
don't care if the shutdown is taking a long time. It's hard to say





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-9689) Automatic broker version detection to initialize stream client

2021-01-27 Thread feyman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9689?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17272901#comment-17272901
 ] 

feyman edited comment on KAFKA-9689 at 1/28/21, 3:54 AM:
-

The version detection flow leveraging the versioning system is as described in 
the section: Use case: {{group_coordinator}} feature flag in KIP-584.

The code change mainly contains 3 parts:

1) StreamThread should know if itself is leader in the consumer group, if yes, 
it should periodically query the describeFeatures api to see if there are 
feature metadata updates

2) There should be some place to put the feature metadata in the 
MemberMetadata, either in the assignment(userData) or add a new field in the 
MemberMetadata(which involves public interface change). Current implementation 
leverages the assignment.

2.1 Each streamThread put the feature metadata(EOS feature version) in the 
SubscriptionInfo when subscribe

2.2 Upon receiving the JoinGroupResp, the leader will know the current feature 
version in the broker side, it can put the current broker side feature 
version(if updated) in the assignment as suggested feature version

2.3 when the follower receive the assignment in the SyncGroupResp, it will find 
the new broker side latest feature version

3) the StreamThread should dynamically switch to the new thread producer 
without affecting the existing tasks that

 

I'm implementing the code as the sequence above, currently on 2, but need to 
discuss if step 2 make sense, haven't start step 3 yet.

 

Questions to [~bchen225242] :

A) 2.1 Might need to add a new field in the SubscriptionInfoData to include the 
client side feature metadata, it seems ok to me since SubscriptionInfoData is 
the stream-specific and doesn't seem to need a KIP for it, thoughts ?

 


was (Author: feyman):
The version detection flow leveraging the versioning system is as described in 
the section: Use case: {{group_coordinator}} feature flag in KIP-584.

The code change mainly contains 3 parts:

1) StreamThread should know if itself is leader in the consumer group, if yes, 
it should periodically query the describeFeatures api to see if there are 
feature metadata updates

2) There should be some place to put the feature metadata in the 
MemberMetadata, either in the assignment(userData) or add a new field in the 
MemberMetadata(which involves public interface change). Current implementation 
levrages the assignment.

3) the StreamThread should dynamically switch to the new thread producer 
without affecting the existing tasks that

> Automatic broker version detection to initialize stream client
> --
>
> Key: KAFKA-9689
> URL: https://issues.apache.org/jira/browse/KAFKA-9689
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Boyang Chen
>Assignee: feyman
>Priority: Major
>
> Eventually we shall deprecate the flag to suppress EOS thread producer 
> feature, instead we take version detection approach on broker to decide which 
> semantic to use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r565803134



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -54,14 +61,27 @@
 private static final Logger log = 
LoggerFactory.getLogger(StateDirectory.class);
 static final String LOCK_FILE_NAME = ".lock";
 
+/* The process file is used to persist the process id across restarts.
+ * The version 0 schema consists only of the version number and UUID
+ *
+ * If you need to store additional metadata of the process you can bump 
the version numberand append new fields.
+ * For compatibility reasons you should only ever add fields, and only by 
appending them to the end
+ */
+private static final String PROCESS_FILE_NAME = 
"kafka-streams-process-metadata";
+private static final int PROCESS_FILE_VERSION = 0;

Review comment:
   No idea if we'll ever want to add anything else to this file, but better 
to be safe and forward compatible than sad





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r565805758



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -416,11 +524,15 @@ private void cleanRemovedTasksCalledByUser() throws 
Exception {
 logPrefix(), dirName, id),
 exception
 );
-throw exception;

Review comment:
   IDE was giving me a warning





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r565806577



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
##
@@ -112,6 +118,19 @@ public void createTopics() throws Exception {
 CLUSTER.createTopic(outputTopic, 1, 3);
 }
 
+@After
+public void cleanUp() {
+if (streamInstanceOne != null) {
+streamInstanceOne.close();
+}
+if (streamInstanceTwo != null) {
+streamInstanceTwo.close();
+}
+if (streamInstanceOneRecovery != null) {
+streamInstanceOneRecovery.close();
+}

Review comment:
   There are no logical changes to this test, I just had to refactor it a 
bit because we were creating two copies of the same KafkaStreams at the same 
time (with the same app.dir & state.dir), even though one of them wasn't 
started until much later. Since we do the state initialization inside the 
KafkaStreams constructor, this was no good





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565823251



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -997,19 +1002,63 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
  * no stream threads are alive
  */
 public Optional removeStreamThread() {
+return removeStreamThread(Long.MAX_VALUE);
+}
+
+/**
+ * Removes one stream thread out of the running stream threads from this 
Kafka Streams client.
+ * 
+ * The removed stream thread is gracefully shut down. This method does not 
specify which stream
+ * thread is shut down.
+ * 
+ * Since the number of stream threads decreases, the sizes of the caches 
in the remaining stream
+ * threads are adapted so that the sum of the cache sizes over all stream 
threads equals the total
+ * cache size specified in configuration {@link 
StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG}.
+ *
+ * @param timeout The the length of time to wait for the thread to shutdown
+ * @throws TimeoutException if the thread does not stop in time
+ * @return name of the removed stream thread or empty if a stream thread 
could not be removed because
+ * no stream threads are alive
+ */
+public Optional removeStreamThread(final Duration timeout) throws 
TimeoutException {
+final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
+return removeStreamThread(timeoutMs);
+}
+
+private Optional removeStreamThread(final long timeoutMs) throws 
TimeoutException {
+final long begin = time.milliseconds();
 if (isRunningOrRebalancing()) {
 synchronized (changeThreadCount) {
 // make a copy of threads to avoid holding lock
 for (final StreamThread streamThread : new 
ArrayList<>(threads)) {
 if (streamThread.isAlive() && 
(!streamThread.getName().equals(Thread.currentThread().getName())
 || threads.size() == 1)) {
+final Optional groupInstanceID = 
streamThread.getGroupInstanceID();
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+if 
(!streamThread.waitOnThreadState(StreamThread.State.DEAD, timeoutMs)) {
+log.warn("Thread " + streamThread.getName() + 
" did not stop in the allotted time");
+throw new TimeoutException("Thread " + 
streamThread.getName() + " did not stop in the allotted time");

Review comment:
   Okay I buy it I'll delay the exception





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #9982: MINOR: remove some explicit type argument in generator

2021-01-27 Thread GitBox


dengziming commented on pull request #9982:
URL: https://github.com/apache/kafka/pull/9982#issuecomment-768802003


   @chia7712 @cmccabe  Hello, PTAL.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9906: KAFKA-10885 Refactor MemoryRecordsBuilderTest/MemoryRecordsTest to avoid a lot of…

2021-01-27 Thread GitBox


chia7712 commented on a change in pull request #9906:
URL: https://github.com/apache/kafka/pull/9906#discussion_r565826783



##
File path: 
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##
@@ -1004,10 +998,6 @@ public void testWithRecords(Args args) {
 }

Review comment:
   @g1geordie I file a patch for aforementioned idea. Please take a look at 
https://github.com/chia7712/kafka/pull/1/files
   
   it uses explicit assert (exception or expected value) for all parameters 
instead of just ignoring them
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10658) ErrantRecordReporter.report always return completed future even though the record is not sent to DLQ topic yet

2021-01-27 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-10658:
---
Affects Version/s: 2.6.0

> ErrantRecordReporter.report always return completed future even though the 
> record is not sent to DLQ topic yet 
> ---
>
> Key: KAFKA-10658
> URL: https://issues.apache.org/jira/browse/KAFKA-10658
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.6.0
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>
> This issue happens when both DLQ and error log are enabled. There is a 
> incorrect filter in handling multiple reports and it results in the 
> uncompleted future is filtered out. Hence, users always receive a completed 
> future even though the record is still in producer buffer.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12220) Replace PowerMock by Mockito

2021-01-27 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273244#comment-17273244
 ] 

Chia-Ping Tsai commented on KAFKA-12220:


[~ijuma] How about splitting PR by package? 

||package||classes||
|org.apache.kafka.connect.runtime.standalone|1|
|org.apache.kafka.connect.runtime.distributed|3|
|org.apache.kafka.connect.runtime.errors|2|
|org.apache.kafka.connect.runtime.rest|3|
|org.apache.kafka.connect.util|3|
|org.apache.kafka.connect.storage|4|
|org.apache.kafka.connect.runtime|9|


> Replace PowerMock by Mockito
> 
>
> Key: KAFKA-12220
> URL: https://issues.apache.org/jira/browse/KAFKA-12220
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> We are migrating project from junit 4 to junit 5 (KAFKA-7339). PowerMock, 
> however, does not support junit 5 totally 
> (https://github.com/powermock/powermock/issues/830). Hence, we ought to 
> replace PowerMock by Mockito before migrating to junit 5 since rewriting all 
> tests which are depending on PowerMock can bring a bunch of changes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r565815347



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1208,24 +1227,28 @@ private Thread shutdownHelper(final boolean error) {
 }
 
 private boolean close(final long timeoutMs) {
-if (state == State.ERROR) {
-log.info("Streams client is already in the terminal state ERROR, 
all resources are closed and the client has stopped.");
+if (state == State.ERROR || state == State.NOT_RUNNING) {
+log.info("Streams client is already in the terminal {} state, all 
resources are closed and the client has stopped.", state);
 return true;
 }
-if (state == State.PENDING_ERROR) {
-log.info("Streams client is in PENDING_ERROR, all resources are 
being closed and the client will be stopped.");
-if (waitOnState(State.ERROR, timeoutMs)) {
+if (state == State.PENDING_ERROR || state == State.PENDING_SHUTDOWN) {
+log.info("Streams client is in {}, all resources are being closed 
and the client will be stopped.", state);
+if (state == State.PENDING_ERROR && waitOnState(State.ERROR, 
timeoutMs)) {
 log.info("Streams client stopped to ERROR completely");
 return true;
+} else if (state == State.PENDING_SHUTDOWN && 
waitOnState(State.NOT_RUNNING, timeoutMs)) {
+log.info("Streams client stopped to NOT_RUNNING completely");
+return true;
 } else {
-log.info("Streams client cannot transition to ERROR completely 
within the timeout");
+log.warn("Streams client cannot transition to {}} completely 
within the timeout", state);

Review comment:
   the state here doesn't make the log make sense. If the state is 
`PENDING_ERROR` then the log should say ERROR

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -133,6 +152,72 @@ private void configurePermissions(final File file) {
 }
 }
 
+/**
+ * @return true if the state directory was successfully locked
+ */
+private boolean lockStateDirectory() {
+final File lockFile = new File(stateDir, LOCK_FILE_NAME);
+try {
+stateDirLockChannel = FileChannel.open(lockFile.toPath(), 
StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+stateDirLock = tryLock(stateDirLockChannel);
+} catch (final IOException e) {
+log.error("Unable to lock the state directory due to unexpected 
exception", e);
+throw new ProcessorStateException("Failed to lock the state 
directory during startup", e);
+}
+
+return stateDirLock != null;
+}
+
+public UUID initializeProcessId() {

Review comment:
   since it doesn't seem that we need to be very thrifty with space for 
this file would it make sense to write it in a more friendly format that would 
be easier to maintain? i.e. json or something, we are giving it a version 
number...

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1208,24 +1227,28 @@ private Thread shutdownHelper(final boolean error) {
 }
 
 private boolean close(final long timeoutMs) {
-if (state == State.ERROR) {
-log.info("Streams client is already in the terminal state ERROR, 
all resources are closed and the client has stopped.");
+if (state == State.ERROR || state == State.NOT_RUNNING) {

Review comment:
   I think this change makes a lot of sense. I don't think it changes the 
final behavior besides avoiding extra state change rejections from the logs, 
but it looks like they are replaced.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -133,6 +152,72 @@ private void configurePermissions(final File file) {
 }
 }
 
+/**
+ * @return true if the state directory was successfully locked
+ */
+private boolean lockStateDirectory() {
+final File lockFile = new File(stateDir, LOCK_FILE_NAME);
+try {
+stateDirLockChannel = FileChannel.open(lockFile.toPath(), 
StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+stateDirLock = tryLock(stateDirLockChannel);

Review comment:
   Is there any case where we might want to release the lock of this state 
directory? It looks like we just hold it





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset

2021-01-27 Thread GitBox


jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r565825661



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -66,32 +78,67 @@ class KafkaMetadataLog(
 if (records.sizeInBytes == 0)
   throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
-val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords],
-  leaderEpoch = epoch,
-  origin = AppendOrigin.Coordinator)
-new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-  throw new KafkaException("Append failed unexpectedly")
-}, appendInfo.lastOffset)
+handleAndConvertLogAppendInfo(
+  log.appendAsLeader(records.asInstanceOf[MemoryRecords],
+leaderEpoch = epoch,
+origin = AppendOrigin.Coordinator
+  )
+)
   }
 
   override def appendAsFollower(records: Records): LogAppendInfo = {
 if (records.sizeInBytes == 0)
   throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
-val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords])
-new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-  throw new KafkaException("Append failed unexpectedly")
-}, appendInfo.lastOffset)
+
handleAndConvertLogAppendInfo(log.appendAsFollower(records.asInstanceOf[MemoryRecords]))
+  }
+
+  private def handleAndConvertLogAppendInfo(appendInfo: 
kafka.log.LogAppendInfo): LogAppendInfo = {
+appendInfo.firstOffset match {
+  case Some(firstOffset) =>
+if (firstOffset.relativePositionInSegment == 0) {
+  // Assume that a new segment was created if the relative position is 0
+  log.deleteOldSegments()
+}
+new LogAppendInfo(firstOffset.messageOffset, appendInfo.lastOffset)
+  case None =>
+throw new KafkaException(s"Append failed unexpectedly: $appendInfo")
+}
   }
 
   override def lastFetchedEpoch: Int = {
-log.latestEpoch.getOrElse(0)
+log.latestEpoch.getOrElse {
+  latestSnapshotId.map { snapshotId =>
+val logEndOffset = endOffset().offset
+if (snapshotId.offset == startOffset && snapshotId.offset == 
logEndOffset) {
+  // Return the epoch of the snapshot when the log is empty
+  snapshotId.epoch
+} else {
+  throw new KafkaException(
+s"Log doesn't have a last fetch epoch and there is a snapshot 
($snapshotId). " +
+s"Expected the snapshot's end offset to match the log's end offset 
($logEndOffset) " +
+s"and the log start offset ($startOffset)"
+  )
+}
+  }.orElse(0)
+}
   }
 
   override def endOffsetForEpoch(leaderEpoch: Int): 
Optional[raft.OffsetAndEpoch] = {
 val endOffsetOpt = log.endOffsetForEpoch(leaderEpoch).map { offsetAndEpoch 
=>
-  new raft.OffsetAndEpoch(offsetAndEpoch.offset, 
offsetAndEpoch.leaderEpoch)
+  if (oldestSnapshotId.isPresent() &&
+offsetAndEpoch.offset == oldestSnapshotId.get().offset &&
+offsetAndEpoch.leaderEpoch == leaderEpoch) {

Review comment:
   First, thanks a lot for thinking through this code and provide such 
detail comment. This code is important to get right.
   
   > the requested epoch is larger than any known epoch.
   
   For this case I decided to throw an exception because the Fetch request 
handling code already checks for this condition and returns an error Fetch 
response. The leader returns an error Fetch response when this is invariant is 
violated: `lastFetchedEpoch <= currentLeaderEpoch == quorum.epoch`. In other 
words based on the current implementation, I think it is a bug if 
`endOffsetForEpoch` returns `Optional.empty()`.
   
   1. 
https://github.com/apache/kafka/blob/cbe435b34acb1a4563bc7c1f06895d2b52be2672/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L951-L954
   2. 
https://github.com/apache/kafka/blob/cbe435b34acb1a4563bc7c1f06895d2b52be2672/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1618-L1621
   
   > the requested epoch is less than any known epoch we have
   
   When thinking though this case I convinced myself. That the leader can 
determine if it should send a snapshot simply by comparing "fetch offset" and 
"last fetched epoch" against the `oldestSnapshotId`. The `oldestSnapshotId` is 
the snapshot with an end offset equal to the log start offset.
   
   > The current epoch cache implementation handles this by returning the 
requested epoch with an end offset equal to the log start offset. So we detect 
the case here by checking that the returned epoch matches the requested epoch 
and the end offset matches the offset corresponding to the oldest snapshot, 
which should be the same as the log start offset. Right so far?
   
   Correct. My comment here assumes that the fetch offset is between the log 
start offset and log end offset, and that sending a snapshot is not required. 
When thinking through 

[GitHub] [kafka] g1geordie commented on a change in pull request #9906: KAFKA-10885 Refactor MemoryRecordsBuilderTest/MemoryRecordsTest to avoid a lot of…

2021-01-27 Thread GitBox


g1geordie commented on a change in pull request #9906:
URL: https://github.com/apache/kafka/pull/9906#discussion_r565834861



##
File path: 
clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
##
@@ -1004,10 +998,6 @@ public void testWithRecords(Args args) {
 }

Review comment:
   It sound like you change `assume (condition)` to  `if (condition) ... 
else ...`  in all method





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #9981: MINOR: Upgrade to Scala 2.12.13

2021-01-27 Thread GitBox


chia7712 merged pull request #9981:
URL: https://github.com/apache/kafka/pull/9981


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9684: KAFKA-10764: Add support for returning topic IDs on create, supplying topic IDs for delete

2021-01-27 Thread GitBox


jolshan commented on a change in pull request #9684:
URL: https://github.com/apache/kafka/pull/9684#discussion_r565783921



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1843,6 +1843,8 @@ class KafkaApis(val requestChannel: RequestChannel,
   .setNumPartitions(-1)
   .setReplicationFactor(-1)
   .setTopicConfigErrorCode(Errors.NONE.code)
+  } else {
+
result.setTopicId(controller.controllerContext.topicIds.getOrElse(result.name(),
 Uuid.ZERO_UUID))

Review comment:
   I've added something like this to ZkAdminManager. Let me know if it 
makes sense.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] inponomarev commented on pull request #9107: KAFKA-5488: Add type-safe split() operator

2021-01-27 Thread GitBox


inponomarev commented on pull request #9107:
URL: https://github.com/apache/kafka/pull/9107#issuecomment-768769254


   > @inponomarev the failing tests seems to be due to a known issue that was 
fixed via #9768
   > 
   > Can you rebase your PR to pickup the fix so we can get a green build?
   
   Done rebasing, expect the fixes according to your latest review soon!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r565802766



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1208,24 +1227,28 @@ private Thread shutdownHelper(final boolean error) {
 }
 
 private boolean close(final long timeoutMs) {
-if (state == State.ERROR) {
-log.info("Streams client is already in the terminal state ERROR, 
all resources are closed and the client has stopped.");
+if (state == State.ERROR || state == State.NOT_RUNNING) {

Review comment:
   Something I noticed during testing, I feel it makes sense for the 
handling of ERROR and NOT_RUNNING to parallel (same for the PENDING_ flavors). 
This is a slight change in behavior; now if a user calls `close()` while the 
instance is already closing, it will wait for the ongoing shutdown to complete 
before returning (with timeout).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565802847



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -180,6 +182,19 @@ public void shouldRemoveStreamThread() throws Exception {
 }
 }
 
+@Test
+public void shouldnNotRemoveStreamThreadWithTimeout() throws Exception {
+try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
+addStreamStateChangeListener(kafkaStreams);
+startStreamsAndWaitForRunning(kafkaStreams);
+
+final int oldThreadCount = 
kafkaStreams.localThreadsMetadata().size();
+stateTransitionHistory.clear();
+assertThrows(TimeoutException.class, () -> 
kafkaStreams.removeStreamThread(Duration.ZERO.minus(DEFAULT_DURATION)));

Review comment:
   But it isn't consistent because if the thread removes itself then the 
timeout its started





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9840: KAFKA-10867: Improved task idling

2021-01-27 Thread GitBox


vvcephei commented on pull request #9840:
URL: https://github.com/apache/kafka/pull/9840#issuecomment-768782409


   Hmm, the Java 8 build appears to have hung after an hour and 58 minutes. 
It's been running for 3 hours and 30 minutes now. This is now the 16th build, 
and there have been multiple Java 8 successes to date, so I think it's 
environmental.
   
   I'll go ahead with the merge.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #9840: KAFKA-10867: Improved task idling

2021-01-27 Thread GitBox


vvcephei merged pull request #9840:
URL: https://github.com/apache/kafka/pull/9840


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9579: KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed

2021-01-27 Thread GitBox


abbccdda commented on a change in pull request #9579:
URL: https://github.com/apache/kafka/pull/9579#discussion_r561594743



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1370,55 +1345,164 @@ class KafkaApis(val requestChannel: RequestChannel,
 !authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, 
findCoordinatorRequest.data.key))
   requestHelper.sendErrorResponseMaybeThrottle(request, 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
 else {
-  // get metadata (and create the topic if necessary)
-  val (partition, topicMetadata) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
+  val (partition, internalTopicName) = 
CoordinatorType.forId(findCoordinatorRequest.data.keyType) match {
 case CoordinatorType.GROUP =>
-  val partition = 
groupCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = getOrCreateInternalTopic(GROUP_METADATA_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (groupCoordinator.partitionFor(findCoordinatorRequest.data.key), 
GROUP_METADATA_TOPIC_NAME)
 
 case CoordinatorType.TRANSACTION =>
-  val partition = 
txnCoordinator.partitionFor(findCoordinatorRequest.data.key)
-  val metadata = 
getOrCreateInternalTopic(TRANSACTION_STATE_TOPIC_NAME, 
request.context.listenerName)
-  (partition, metadata)
+  (txnCoordinator.partitionFor(findCoordinatorRequest.data.key), 
TRANSACTION_STATE_TOPIC_NAME)
+  }
 
-case _ =>
-  throw new InvalidRequestException("Unknown coordinator type in 
FindCoordinator request")
+  val topicMetadata = 
metadataCache.getTopicMetadata(Set(internalTopicName), 
request.context.listenerName)
+  def createFindCoordinatorResponse(error: Errors,
+node: Node,
+requestThrottleMs: Int,
+errorMessage: Option[String] = None): 
FindCoordinatorResponse = {
+new FindCoordinatorResponse(
+  new FindCoordinatorResponseData()
+.setErrorCode(error.code)
+.setErrorMessage(errorMessage.getOrElse(error.message))
+.setNodeId(node.id)
+.setHost(node.host)
+.setPort(node.port)
+.setThrottleTimeMs(requestThrottleMs))
   }
 
-  def createResponse(requestThrottleMs: Int): AbstractResponse = {
-def createFindCoordinatorResponse(error: Errors, node: Node): 
FindCoordinatorResponse = {
-  new FindCoordinatorResponse(
-  new FindCoordinatorResponseData()
-.setErrorCode(error.code)
-.setErrorMessage(error.message)
-.setNodeId(node.id)
-.setHost(node.host)
-.setPort(node.port)
-.setThrottleTimeMs(requestThrottleMs))
+  val topicCreationNeeded = topicMetadata.headOption.isEmpty
+  if (topicCreationNeeded) {
+if (hasEnoughAliveBrokers(internalTopicName)) {
+  if (shouldForwardRequest(request)) {
+forwardingManager.sendInterBrokerRequest(
+  getCreateTopicsRequest(Seq(internalTopicName)),
+  _ => ())
+  } else {
+val controllerMutationQuota = 
quotas.controllerMutation.newQuotaFor(request, strictSinceVersion = 6)
+
+val topicConfigs = Map(internalTopicName -> 
getTopicConfigs(internalTopicName))
+adminManager.createTopics(
+  config.requestTimeoutMs,
+  validateOnly = false,
+  topicConfigs,
+  Map.empty,
+  controllerMutationQuota,
+  _ => ())
+  }
 }
-val responseBody = if (topicMetadata.errorCode != Errors.NONE.code) {
-  createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, 
Node.noNode)
-} else {
-  val coordinatorEndpoint = topicMetadata.partitions.asScala
-.find(_.partitionIndex == partition)
-.filter(_.leaderId != MetadataResponse.NO_LEADER_ID)
-.flatMap(metadata => 
metadataCache.getAliveBroker(metadata.leaderId))
-.flatMap(_.getNode(request.context.listenerName))
-.filterNot(_.isEmpty)
-
-  coordinatorEndpoint match {
-case Some(endpoint) =>
-  createFindCoordinatorResponse(Errors.NONE, endpoint)
-case _ =>
-  createFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE, 
Node.noNode)
+
+requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => 
createFindCoordinatorResponse(
+  Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode, requestThrottleMs))
+  } else {
+def createResponse(requestThrottleMs: Int): AbstractResponse = {
+  val responseBody = if (topicMetadata.head.errorCode != 
Errors.NONE.code) {
+   

[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset

2021-01-27 Thread GitBox


jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r565825661



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -66,32 +78,67 @@ class KafkaMetadataLog(
 if (records.sizeInBytes == 0)
   throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
-val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords],
-  leaderEpoch = epoch,
-  origin = AppendOrigin.Coordinator)
-new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-  throw new KafkaException("Append failed unexpectedly")
-}, appendInfo.lastOffset)
+handleAndConvertLogAppendInfo(
+  log.appendAsLeader(records.asInstanceOf[MemoryRecords],
+leaderEpoch = epoch,
+origin = AppendOrigin.Coordinator
+  )
+)
   }
 
   override def appendAsFollower(records: Records): LogAppendInfo = {
 if (records.sizeInBytes == 0)
   throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
-val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords])
-new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-  throw new KafkaException("Append failed unexpectedly")
-}, appendInfo.lastOffset)
+
handleAndConvertLogAppendInfo(log.appendAsFollower(records.asInstanceOf[MemoryRecords]))
+  }
+
+  private def handleAndConvertLogAppendInfo(appendInfo: 
kafka.log.LogAppendInfo): LogAppendInfo = {
+appendInfo.firstOffset match {
+  case Some(firstOffset) =>
+if (firstOffset.relativePositionInSegment == 0) {
+  // Assume that a new segment was created if the relative position is 0
+  log.deleteOldSegments()
+}
+new LogAppendInfo(firstOffset.messageOffset, appendInfo.lastOffset)
+  case None =>
+throw new KafkaException(s"Append failed unexpectedly: $appendInfo")
+}
   }
 
   override def lastFetchedEpoch: Int = {
-log.latestEpoch.getOrElse(0)
+log.latestEpoch.getOrElse {
+  latestSnapshotId.map { snapshotId =>
+val logEndOffset = endOffset().offset
+if (snapshotId.offset == startOffset && snapshotId.offset == 
logEndOffset) {
+  // Return the epoch of the snapshot when the log is empty
+  snapshotId.epoch
+} else {
+  throw new KafkaException(
+s"Log doesn't have a last fetch epoch and there is a snapshot 
($snapshotId). " +
+s"Expected the snapshot's end offset to match the log's end offset 
($logEndOffset) " +
+s"and the log start offset ($startOffset)"
+  )
+}
+  }.orElse(0)
+}
   }
 
   override def endOffsetForEpoch(leaderEpoch: Int): 
Optional[raft.OffsetAndEpoch] = {
 val endOffsetOpt = log.endOffsetForEpoch(leaderEpoch).map { offsetAndEpoch 
=>
-  new raft.OffsetAndEpoch(offsetAndEpoch.offset, 
offsetAndEpoch.leaderEpoch)
+  if (oldestSnapshotId.isPresent() &&
+offsetAndEpoch.offset == oldestSnapshotId.get().offset &&
+offsetAndEpoch.leaderEpoch == leaderEpoch) {

Review comment:
   First, thanks a lot for thinking through this code and provide such 
detail comment. This code is important to get right.
   
   > the requested epoch is larger than any known epoch.
   
   For this case I decided to throw an exception because the Fetch request 
handling code already checks for this condition and returns an error Fetch 
response. The leader returns an error Fetch response when this is invariant is 
violated: `lastFetchedEpoch <= currentLeaderEpoch == quorum.epoch`. In other 
words based on the current implementation, I think it is a bug if 
`endOffsetForEpoch` returns `Optional.empty()`.
   
   1. 
https://github.com/apache/kafka/blob/cbe435b34acb1a4563bc7c1f06895d2b52be2672/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L951-L954
   2. 
https://github.com/apache/kafka/blob/cbe435b34acb1a4563bc7c1f06895d2b52be2672/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1618-L1621
   
   > the requested epoch is less than any known epoch we have
   
   When thinking though this case I convinced myself. That the leader can 
determine if it should send a snapshot simply by comparing "fetch offset" and 
"last fetched epoch" against the `oldestSnapshotId`. The `oldestSnapshotId` is 
the snapshot with an end offset equal to the log start offset.
   
   > The current epoch cache implementation handles this by returning the 
requested epoch with an end offset equal to the log start offset. So we detect 
the case here by checking that the returned epoch matches the requested epoch 
and the end offset matches the offset corresponding to the oldest snapshot, 
which should be the same as the log start offset. Right so far?
   
   Correct. My comment here assumes that the fetch offset is between the log 
start offset and log end offset, and that the sending a snapshot is not 
required. When thinking 

[GitHub] [kafka] chia7712 commented on pull request #9981: MINOR: Upgrade to Scala 2.12.13

2021-01-27 Thread GitBox


chia7712 commented on pull request #9981:
URL: https://github.com/apache/kafka/pull/9981#issuecomment-768681359


   build pass



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9708: KAFKA-9126: KIP-689: StreamJoined changelog configuration

2021-01-27 Thread GitBox


mjsax commented on pull request #9708:
URL: https://github.com/apache/kafka/pull/9708#issuecomment-768685944


   Thanks @lct45!
   
   For reference: https://github.com/apache/kafka/pull/9951 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2021-01-27 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273249#comment-17273249
 ] 

Matthias J. Sax commented on KAFKA-6520:


i am wondering if https://issues.apache.org/jira/browse/KAFKA-10866 (merge 
recently) is something we could exploit to implement a DISCONNECT state? The 
new metadata contains a `receivedTimestamp` field and thus we could track the 
time difference of "now" and the last received fetch response.

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> 
>
> Key: KAFKA-6520
> URL: https://issues.apache.org/jira/browse/KAFKA-6520
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Kohout
>Assignee: Vince Mu
>Priority: Major
>  Labels: newbie, user-experience
>
> KIP WIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams]
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
>  See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  This is a link to a related 
> issue.
> -
> Update: there are some discussions on the PR itself which leads me to think 
> that a more general solution should be at the ClusterConnectionStates rather 
> than at the Streams or even Consumer level. One proposal would be:
>  * Add a new metric named `failedConnection` in SelectorMetrics which is 
> recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the 
> IOException / RuntimeException which indicates the connection disconnected.
>  * And then users of Consumer / Streams can monitor on this metric, which 
> normally will only have close to zero values as we have transient 
> disconnects, if it is spiking it means the brokers are consistently being 
> unavailable indicting the state.
> [~Yohan123] WDYT?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565785613



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -180,6 +182,19 @@ public void shouldRemoveStreamThread() throws Exception {
 }
 }
 
+@Test
+public void shouldnNotRemoveStreamThreadWithTimeout() throws Exception {
+try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
+addStreamStateChangeListener(kafkaStreams);
+startStreamsAndWaitForRunning(kafkaStreams);
+
+final int oldThreadCount = 
kafkaStreams.localThreadsMetadata().size();
+stateTransitionHistory.clear();
+assertThrows(TimeoutException.class, () -> 
kafkaStreams.removeStreamThread(Duration.ZERO.minus(DEFAULT_DURATION)));

Review comment:
   I don't either...





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9906: KAFKA-10885 Refactor MemoryRecordsBuilderTest/MemoryRecordsTest to avoid a lot of…

2021-01-27 Thread GitBox


chia7712 commented on pull request #9906:
URL: https://github.com/apache/kafka/pull/9906#issuecomment-768769711


   For another, ```testWriteControlBatchNotAllowedMagicV0``` and 
```testWriteControlBatchNotAllowedMagicV1``` are almost same. Could we merge 
them into single test case?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-3745) Consider adding join key to ValueJoiner interface

2021-01-27 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck reassigned KAFKA-3745:
--

Assignee: Bill Bejeck

> Consider adding join key to ValueJoiner interface
> -
>
> Key: KAFKA-3745
> URL: https://issues.apache.org/jira/browse/KAFKA-3745
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Bill Bejeck
>Priority: Minor
>  Labels: api, kip
>
> In working with Kafka Stream joining, it's sometimes the case that a join key 
> is not actually present in the values of the joins themselves (if, for 
> example, a previous transform generated an ephemeral join key.) In such 
> cases, the actual key of the join is not available in the ValueJoiner 
> implementation to be used to construct the final joined value. This can be 
> worked around by explicitly threading the join key into the value if needed, 
> but it seems like extending the interface to pass the join key along as well 
> would be helpful.
> KIP-149: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #9420: KAFKA-10604: The StreamsConfig.STATE_DIR_CONFIG's default value does not reflect the JVM parameter or OS-specific settings

2021-01-27 Thread GitBox


mjsax commented on pull request #9420:
URL: https://github.com/apache/kafka/pull/9420#issuecomment-768709672


   @dongjinleekr -- the PR shows merge conflicts. Can you rebase once more. 
Sorry about that.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-3745) Consider adding join key to ValueJoiner interface

2021-01-27 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-3745:
--

Assignee: (was: Bill Bejeck)

> Consider adding join key to ValueJoiner interface
> -
>
> Key: KAFKA-3745
> URL: https://issues.apache.org/jira/browse/KAFKA-3745
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Priority: Minor
>  Labels: api, kip
>
> In working with Kafka Stream joining, it's sometimes the case that a join key 
> is not actually present in the values of the joins themselves (if, for 
> example, a previous transform generated an ephemeral join key.) In such 
> cases, the actual key of the join is not available in the ValueJoiner 
> implementation to be used to construct the final joined value. This can be 
> worked around by explicitly threading the join key into the value if needed, 
> but it seems like extending the interface to pass the join key along as well 
> would be helpful.
> KIP-149: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-149%3A+Enabling+key+access+in+ValueTransformer%2C+ValueMapper%2C+and+ValueJoiner]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on pull request #9967: KAFKA-12236; New meta.properties logic for KIP-500

2021-01-27 Thread GitBox


hachikuji commented on pull request #9967:
URL: https://github.com/apache/kafka/pull/9967#issuecomment-768742967


   @chia7712 @ijuma Thanks for the comments thus far. This is ready for another 
look when you have time.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #9589: KAFKA-10710 - Mirror Maker 2 - Create herders only if source->target.enabled=true

2021-01-27 Thread GitBox


hachikuji commented on pull request #9589:
URL: https://github.com/apache/kafka/pull/9589#issuecomment-768756102


   @twobeeb Before I merge, would you mind updating the PR description? Also, I 
will leave it to you to add the doc suggestion from @skaundinya15. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r565802173



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -927,6 +912,39 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
 return streamThread;
 }
 
+private static Metrics getMetrics(final StreamsConfig config, final Time 
time, final String clientId) {
+final MetricConfig metricConfig = new MetricConfig()
+.samples(config.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
+
.recordLevel(Sensor.RecordingLevel.forName(config.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
+
.timeWindow(config.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), 
TimeUnit.MILLISECONDS);
+final List reporters = 
config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG,
+  
MetricsReporter.class,
+  
Collections.singletonMap(StreamsConfig.CLIENT_ID_CONFIG, clientId));
+final JmxReporter jmxReporter = new JmxReporter();
+jmxReporter.configure(config.originals());
+reporters.add(jmxReporter);
+final MetricsContext metricsContext = new 
KafkaMetricsContext(JMX_PREFIX,
+  
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
+return new Metrics(metricConfig, reporters, time, metricsContext);
+}
+
+private int getNumStreamThreads(final boolean hasGlobalTopology) {
+final int numStreamThreads;
+if (internalTopologyBuilder.hasNoNonGlobalTopology()) {
+log.info("Overriding number of StreamThreads to zero for 
global-only topology");
+numStreamThreads = 0;
+} else {
+numStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+}
+
+if (numStreamThreads == 0 && !hasGlobalTopology) {
+log.error("Topology with no input topics will create no stream 
threads and no global thread.");
+throw new TopologyException("Topology has no stream threads and no 
global threads, " +
+"must subscribe to at least one 
source topic or global table.");
+}
+return numStreamThreads;

Review comment:
   Just tried to factor some of the self-contained logic into helper 
methods, since I found it incredibly difficult to get oriented within the 
super-long KafkaStreams constructor





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-27 Thread GitBox


ableegoldman commented on a change in pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#discussion_r565801932



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -782,8 +782,27 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
  final Time time) throws StreamsException {
 this.config = config;
 this.time = time;
+
+this.internalTopologyBuilder = internalTopologyBuilder;
+internalTopologyBuilder.rewriteTopology(config);
+
+// sanity check to fail-fast in case we cannot build a 
ProcessorTopology due to an exception
+taskTopology = internalTopologyBuilder.buildTopology();
+globalTaskTopology = 
internalTopologyBuilder.buildGlobalStateTopology();
+
+final boolean hasGlobalTopology = globalTaskTopology != null;
+final boolean hasPersistentStores = 
taskTopology.hasPersistentLocalStore() ||
+(hasGlobalTopology && 
globalTaskTopology.hasPersistentGlobalStore());
+
+try {
+stateDirectory = new StateDirectory(config, time, 
hasPersistentStores);
+processId = stateDirectory.initializeProcessId();

Review comment:
   this is the only logical change in the KafkaStreams constructor: the 
rest of the diff is due to moving things around in order to get everything 
initialized in the proper order





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9978: KAFKA-10716: persist UUID in state directory for stable processId across restarts

2021-01-27 Thread GitBox


ableegoldman commented on pull request #9978:
URL: https://github.com/apache/kafka/pull/9978#issuecomment-768786220


   Not done with the tests, but I'd appreciate some feedback on the non-testing 
code and general idea -- any takers for review? @cadonna @vvcephei 
@guozhangwang @wcarlson5 @lct45 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9420: KAFKA-10604: The StreamsConfig.STATE_DIR_CONFIG's default value does not reflect the JVM parameter or OS-specific settings

2021-01-27 Thread GitBox


vvcephei commented on pull request #9420:
URL: https://github.com/apache/kafka/pull/9420#issuecomment-768789634


   Hey @dongjinleekr ,
   
   Sorry for the force-push, but I had to rebase this and resolve a conflict 
before merging. Note that the conflict was from 
462c89e0b436abd56864bea8bbcaf1ab70b7f66e, which re-organized the boolean 
conditions in the StateDirectory constructor, specifically where we warn if the 
state dir is a temp dir.
   
   After resolving the conflict, I noticed there's no test for that warning, so 
I added one to be sure it works. It also looked like the temp dir check could 
actually be a bit simpler, so I just tweaked it rather than leaving a new 
comment for you to address.
   
   I hope this is all ok. I'll let the tests run and merge in the morning, 
unless you have any objections.
   
   Thanks!
   -John



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565820783



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AdjustStreamThreadCountTest.java
##
@@ -180,6 +182,19 @@ public void shouldRemoveStreamThread() throws Exception {
 }

Review comment:
   yes





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9816: KAFKA-10761: Kafka Raft update log start offset

2021-01-27 Thread GitBox


jsancio commented on a change in pull request #9816:
URL: https://github.com/apache/kafka/pull/9816#discussion_r565825661



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -66,32 +78,67 @@ class KafkaMetadataLog(
 if (records.sizeInBytes == 0)
   throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
-val appendInfo = log.appendAsLeader(records.asInstanceOf[MemoryRecords],
-  leaderEpoch = epoch,
-  origin = AppendOrigin.Coordinator)
-new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-  throw new KafkaException("Append failed unexpectedly")
-}, appendInfo.lastOffset)
+handleAndConvertLogAppendInfo(
+  log.appendAsLeader(records.asInstanceOf[MemoryRecords],
+leaderEpoch = epoch,
+origin = AppendOrigin.Coordinator
+  )
+)
   }
 
   override def appendAsFollower(records: Records): LogAppendInfo = {
 if (records.sizeInBytes == 0)
   throw new IllegalArgumentException("Attempt to append an empty record 
set")
 
-val appendInfo = log.appendAsFollower(records.asInstanceOf[MemoryRecords])
-new LogAppendInfo(appendInfo.firstOffset.getOrElse {
-  throw new KafkaException("Append failed unexpectedly")
-}, appendInfo.lastOffset)
+
handleAndConvertLogAppendInfo(log.appendAsFollower(records.asInstanceOf[MemoryRecords]))
+  }
+
+  private def handleAndConvertLogAppendInfo(appendInfo: 
kafka.log.LogAppendInfo): LogAppendInfo = {
+appendInfo.firstOffset match {
+  case Some(firstOffset) =>
+if (firstOffset.relativePositionInSegment == 0) {
+  // Assume that a new segment was created if the relative position is 0
+  log.deleteOldSegments()
+}
+new LogAppendInfo(firstOffset.messageOffset, appendInfo.lastOffset)
+  case None =>
+throw new KafkaException(s"Append failed unexpectedly: $appendInfo")
+}
   }
 
   override def lastFetchedEpoch: Int = {
-log.latestEpoch.getOrElse(0)
+log.latestEpoch.getOrElse {
+  latestSnapshotId.map { snapshotId =>
+val logEndOffset = endOffset().offset
+if (snapshotId.offset == startOffset && snapshotId.offset == 
logEndOffset) {
+  // Return the epoch of the snapshot when the log is empty
+  snapshotId.epoch
+} else {
+  throw new KafkaException(
+s"Log doesn't have a last fetch epoch and there is a snapshot 
($snapshotId). " +
+s"Expected the snapshot's end offset to match the log's end offset 
($logEndOffset) " +
+s"and the log start offset ($startOffset)"
+  )
+}
+  }.orElse(0)
+}
   }
 
   override def endOffsetForEpoch(leaderEpoch: Int): 
Optional[raft.OffsetAndEpoch] = {
 val endOffsetOpt = log.endOffsetForEpoch(leaderEpoch).map { offsetAndEpoch 
=>
-  new raft.OffsetAndEpoch(offsetAndEpoch.offset, 
offsetAndEpoch.leaderEpoch)
+  if (oldestSnapshotId.isPresent() &&
+offsetAndEpoch.offset == oldestSnapshotId.get().offset &&
+offsetAndEpoch.leaderEpoch == leaderEpoch) {

Review comment:
   First, thanks a lot for thinking through this code and provide such 
detail comment. This code is important to get right.
   
   > the requested epoch is larger than any known epoch.
   
   For this case I decided to throw an exception because the Fetch request 
handling code already checks for this condition and returns an error Fetch 
response. The leader returns an error Fetch response when this is invariant is 
violated: `lastFetchedEpoch <= currentLeaderEpoch == quorum.epoch`. In other 
words based on the current implementation, I think it is a bug if 
`endOffsetForEpoch` returns `Optional.empty()`.
   
   1. 
https://github.com/apache/kafka/blob/cbe435b34acb1a4563bc7c1f06895d2b52be2672/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L951-L954
   2. 
https://github.com/apache/kafka/blob/cbe435b34acb1a4563bc7c1f06895d2b52be2672/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L1618-L1621
   
   > the requested epoch is less than any known epoch we have
   
   When thinking though this case I convinced myself. That the leader can 
determine if it should send a snapshot simply by comparing "fetch offset" and 
"last fetched epoch" against the `oldestSnapshotId`. The `oldestSnapshotId` is 
the snapshot with an end offset equal to the log start offset.
   
   > The current epoch cache implementation handles this by returning the 
requested epoch with an end offset equal to the log start offset. So we detect 
the case here by checking that the returned epoch matches the requested epoch 
and the end offset matches the offset corresponding to the oldest snapshot, 
which should be the same as the log start offset. Right so far?
   
   Correct. My comment here assumes that the fetch offset is between the log 
start offset and log end offset, and that sending a snapshot is not required. 
When thinking through 

[jira] [Commented] (KAFKA-12169) Consumer can not know paritions chage when client leader restart with static membership protocol

2021-01-27 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17273328#comment-17273328
 ] 

Boyang Chen commented on KAFKA-12169:
-

In general, the leader should be able to detect metadata discrepancy between 
its remembered topic metadata and broker side metadata. I don't think we have 
any test case to cover both the topic partition change and leader rejoin at the 
same time, so it's possible and needs some verification. 

> Consumer can not know paritions chage when client leader restart with static 
> membership protocol
> 
>
> Key: KAFKA-12169
> URL: https://issues.apache.org/jira/browse/KAFKA-12169
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.5.1, 2.6.1
>Reporter: zou shengfu
>Priority: Major
>
> Background: 
>  Kafka consumer services run with static membership and cooperative rebalance 
> protocol on kubernetes, and services often restart because of operation. When 
> we added partitions from 1000 to 2000 for the topic, client leader restart 
> with unknown member id at the same time, we found  the consumers do not 
> tigger rebalance and still consume 1000 paritions
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


wcarlson5 commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565607533



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -610,17 +610,32 @@ public void setStreamsUncaughtExceptionHandler(final 
java.util.function.Consumer
 this.streamsUncaughtExceptionHandler = streamsUncaughtExceptionHandler;
 }
 
-public void waitOnThreadState(final StreamThread.State targetState) {
+public boolean waitOnThreadState(final StreamThread.State targetState, 
long timeoutMs) {
+if (timeoutMs < 0) {

Review comment:
   for the non timeout uses

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -1147,6 +1162,10 @@ public String toString(final String indent) {
 return indent + "\tStreamsThread threadId: " + getName() + "\n" + 
taskManager.toString(indent);
 }
 
+public String getGroupInstanceID(){
+return mainConsumer.groupMetadata().groupInstanceId().orElse("");

Review comment:
   It seems easier to get it form here than the config. It looked like I 
might have how to manipulate strings in that case 

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1005,11 +1007,56 @@ private StreamThread createAndAddStreamThread(final 
long cacheSizePerThread, fin
 || threads.size() == 1)) {
 streamThread.shutdown();
 if 
(!streamThread.getName().equals(Thread.currentThread().getName())) {
-
streamThread.waitOnThreadState(StreamThread.State.DEAD);
+
streamThread.waitOnThreadState(StreamThread.State.DEAD, -1);
 }
 threads.remove(streamThread);
 final long cacheSizePerThread = 
getCacheSizePerThread(threads.size());
 resizeThreadCache(cacheSizePerThread);
+Collection membersToRemove = 
Collections.singletonList(new 
MemberToRemove(streamThread.getGroupInstanceID()));

Review comment:
   I ended up getting the `group.instance.id` from the streamThread





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9984: MINOR: add timeout and static group rebalance to remove thread

2021-01-27 Thread GitBox


cadonna commented on a change in pull request #9984:
URL: https://github.com/apache/kafka/pull/9984#discussion_r565615976



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##
@@ -1147,6 +1162,10 @@ public String toString(final String indent) {
 return indent + "\tStreamsThread threadId: " + getName() + "\n" + 
taskManager.toString(indent);
 }
 
+public String getGroupInstanceID() {

Review comment:
   Why not an `Optional`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >