[jira] [Commented] (KAFKA-14132) Remaining PowerMock to Mockito tests

2022-09-14 Thread Christo Lolov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17605087#comment-17605087
 ] 

Christo Lolov commented on KAFKA-14132:
---

Hello [~ChrisEgerton]! I have followed the colour-coding I have been using in 
https://issues.apache.org/jira/browse/KAFKA-14133. Thank you for the reviews.

> Remaining PowerMock to Mockito tests
> 
>
> Key: KAFKA-14132
> URL: https://issues.apache.org/jira/browse/KAFKA-14132
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>
> {color:#de350b}Some of the tests below use EasyMock as well. For those 
> migrate both PowerMock and EasyMock to Mockito.{color}
> Unless stated in brackets the tests are in the connect module.
> A list of tests which still require to be moved from PowerMock to Mockito as 
> of 2nd of August 2022 which do not have a Jira issue and do not have pull 
> requests I am aware of which are opened:
> {color:#FF8B00}InReview{color}
> {color:#00875A}Merged{color}
>  # ErrorHandlingTaskTest (owner: Divij)
>  # SourceTaskOffsetCommiterTest (owner: Divij)
>  # WorkerMetricsGroupTest (owner: Divij)
>  # WorkerSinkTaskTest (owner: Divij)
>  # WorkerSinkTaskThreadedTest (owner: Divij)
>  # {color:#00875A}WorkerTaskTest{color} (owner: [~yash.mayya])
>  # {color:#00875A}ErrorReporterTest{color} (owner: [~yash.mayya])
>  # {color:#00875A}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya])
>  # {color:#00875A}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya])
>  # ConnectorsResourceTest
>  # StandaloneHerderTest
>  # KafkaConfigBackingStoreTest
>  # KafkaOffsetBackingStoreTest (owner: Christo) 
> ([https://github.com/apache/kafka/pull/12418])
>  # KafkaBasedLogTest
>  # RetryUtilTest
>  # RepartitionTopicTest (streams) (owner: Christo)
>  # StateManagerUtilTest (streams) (owner: Christo)
> *The coverage report for the above tests after the change should be >= to 
> what the coverage is now.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14132) Remaining PowerMock to Mockito tests

2022-09-14 Thread Christo Lolov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christo Lolov updated KAFKA-14132:
--
Description: 
{color:#de350b}Some of the tests below use EasyMock as well. For those migrate 
both PowerMock and EasyMock to Mockito.{color}

Unless stated in brackets the tests are in the connect module.

A list of tests which still require to be moved from PowerMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:

{color:#FF8B00}InReview{color}
{color:#00875A}Merged{color}

 # ErrorHandlingTaskTest (owner: Divij)
 # SourceTaskOffsetCommiterTest (owner: Divij)
 # WorkerMetricsGroupTest (owner: Divij)
 # WorkerSinkTaskTest (owner: Divij)
 # WorkerSinkTaskThreadedTest (owner: Divij)
 # {color:#00875A}WorkerTaskTest{color} (owner: [~yash.mayya])
 # {color:#00875A}ErrorReporterTest{color} (owner: [~yash.mayya])
 # {color:#00875A}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya])
 # {color:#00875A}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya])
 # ConnectorsResourceTest
 # StandaloneHerderTest
 # KafkaConfigBackingStoreTest
 # KafkaOffsetBackingStoreTest (owner: Christo) 
([https://github.com/apache/kafka/pull/12418])
 # KafkaBasedLogTest
 # RetryUtilTest
 # RepartitionTopicTest (streams) (owner: Christo)
 # StateManagerUtilTest (streams) (owner: Christo)

*The coverage report for the above tests after the change should be >= to what 
the coverage is now.*

  was:
{color:#de350b}Some of the tests below use EasyMock as well. For those migrate 
both PowerMock and EasyMock to Mockito.{color}

Unless stated in brackets the tests are in the connect module.

A list of tests which still require to be moved from PowerMock to Mockito as of 
2nd of August 2022 which do not have a Jira issue and do not have pull requests 
I am aware of which are opened:
 # ErrorHandlingTaskTest (owner: Divij)
 # SourceTaskOffsetCommiterTest (owner: Divij)
 # WorkerMetricsGroupTest (owner: Divij)
 # WorkerSinkTaskTest (owner: Divij)
 # WorkerSinkTaskThreadedTest (owner: Divij)
 # WorkerTaskTest (owner: [~yash.mayya])
 # ErrorReporterTest (owner: [~yash.mayya])
 # RetryWithToleranceOperatorTest (owner: [~yash.mayya])
 # WorkerErrantRecordReporterTest (owner: [~yash.mayya])
 # ConnectorsResourceTest
 # StandaloneHerderTest
 # KafkaConfigBackingStoreTest
 # KafkaOffsetBackingStoreTest (owner: Christo) 
([https://github.com/apache/kafka/pull/12418])
 # KafkaBasedLogTest
 # RetryUtilTest
 # RepartitionTopicTest (streams) (owner: Christo)
 # StateManagerUtilTest (streams) (owner: Christo)

*The coverage report for the above tests after the change should be >= to what 
the coverage is now.*


> Remaining PowerMock to Mockito tests
> 
>
> Key: KAFKA-14132
> URL: https://issues.apache.org/jira/browse/KAFKA-14132
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>
> {color:#de350b}Some of the tests below use EasyMock as well. For those 
> migrate both PowerMock and EasyMock to Mockito.{color}
> Unless stated in brackets the tests are in the connect module.
> A list of tests which still require to be moved from PowerMock to Mockito as 
> of 2nd of August 2022 which do not have a Jira issue and do not have pull 
> requests I am aware of which are opened:
> {color:#FF8B00}InReview{color}
> {color:#00875A}Merged{color}
>  # ErrorHandlingTaskTest (owner: Divij)
>  # SourceTaskOffsetCommiterTest (owner: Divij)
>  # WorkerMetricsGroupTest (owner: Divij)
>  # WorkerSinkTaskTest (owner: Divij)
>  # WorkerSinkTaskThreadedTest (owner: Divij)
>  # {color:#00875A}WorkerTaskTest{color} (owner: [~yash.mayya])
>  # {color:#00875A}ErrorReporterTest{color} (owner: [~yash.mayya])
>  # {color:#00875A}RetryWithToleranceOperatorTest{color} (owner: [~yash.mayya])
>  # {color:#00875A}WorkerErrantRecordReporterTest{color} (owner: [~yash.mayya])
>  # ConnectorsResourceTest
>  # StandaloneHerderTest
>  # KafkaConfigBackingStoreTest
>  # KafkaOffsetBackingStoreTest (owner: Christo) 
> ([https://github.com/apache/kafka/pull/12418])
>  # KafkaBasedLogTest
>  # RetryUtilTest
>  # RepartitionTopicTest (streams) (owner: Christo)
>  # StateManagerUtilTest (streams) (owner: Christo)
> *The coverage report for the above tests after the change should be >= to 
> what the coverage is now.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14156) Built-in partitioner may create suboptimal batches with large linger.ms

2022-09-14 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-14156.
-
  Assignee: Artem Livshits
Resolution: Fixed

Merged the PR to 3.3 and trunk.

> Built-in partitioner may create suboptimal batches with large linger.ms
> ---
>
> Key: KAFKA-14156
> URL: https://issues.apache.org/jira/browse/KAFKA-14156
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 3.3.0
>Reporter: Artem Livshits
>Assignee: Artem Livshits
>Priority: Blocker
> Fix For: 3.3.0
>
>
> The new built-in "sticky" partitioner switches partitions based on the amount 
> of bytes produced to a partition.  It doesn't use batch creation as a switch 
> trigger.  The previous "sticky" DefaultPartitioner switched partition when a 
> new batch was created and with small linger.ms (default is 0) could result in 
> sending larger batches to slower brokers potentially overloading them.  See 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner
>  for more detail.
> However, the with large linger.ms, the new built-in partitioner may create 
> suboptimal batches.  Let's consider an example, suppose linger.ms=500, 
> batch.size=16KB (default) and we produce 24KB / sec, i.e. every 500ms we 
> produce 12KB worth of data.  The new built-in partitioner would switch 
> partition on every 16KB, so we could get into the following batching pattern:
>  * produce 12KB to one partition in 500ms, hit linger, send 12KB batch
>  * produce 4KB more to the same partition, now we've produced 16KB of data, 
> switch partition
>  * produce 12KB to the second partition in 500ms, hit linger, send 12KB batch
>  * in the mean time the 4KB produced to the first partition would hit linger 
> as well, sending 4KB batch
>  * produce 4KB more to the second partition, now we've produced 16KB of data 
> to the second partition, switch to 3rd partition
> so in this scenario the new built-in partitioner produces a mix of 12KB and 
> 4KB batches, while the previous DefaultPartitioner would produce only 12KB 
> batches -- it switches on new batch creation, so there is no "mid-linger" 
> leftover batches.
> To avoid creation of batch fragmentation on partition switch, we can wait 
> until the batch is ready before switching the partition, i.e. the condition 
> to switch to a new partition would be "produced batch.size bytes" AND "batch 
> is not lingering".  This may potentially introduce some non-uniformity into 
> data distribution, but unlike the previous DefaultPartitioner, the 
> non-uniformity would not be based on broker performance and won't 
> re-introduce the bad pattern of sending more data to slower brokers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] junrao merged pull request #12570: KAFKA-14156: Built-in partitioner may create suboptimal batches

2022-09-14 Thread GitBox


junrao merged PR #12570:
URL: https://github.com/apache/kafka/pull/12570


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14234) /admin/delete_topics is not in the list of zookeeper watchers

2022-09-14 Thread Yan Xue (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yan Xue updated KAFKA-14234:

Description: 
I deployed the Kafka cluster on Kuberentes and am trying to figure out how 
topic deletion works. I know Kafka controller has the topic deletion manager 
which watches the node change in the zookeeper. Whenever a topic is deleted, 
the manager is triggered. I expected to see that the {{/admin/delete_topics}} 
is in the watcher list. However, I didn't find it. Sample output:

root@kafka-broker-2:/opt/kafka# echo wchc | nc ZOOKEEPER_IP 2181
0x20010021139
    /admin/preferred_replica_election
    /brokers/ids/0
    /brokers/ids/1
    /brokers/ids/2
    /brokers/topics/__consumer_offsets
    /brokers/ids/3
    /brokers/ids/4
    /controller
    /admin/reassign_partitions
    /brokers/topics/test-test
    /feature
0x200100211390001
    /controller
    /feature
0x1631f9
    /controller
    /feature

 

Even though I can delete the topic, I am confused about the output.

  was:
I deployed the Kafka cluster on Kuberentes and am trying to figure out how 
topic deletion works. I know Kafka controller has the topic deletion manager 
which watches the node change in the zookeeper. Whenever a topic is deleted, 
the manager is triggered. I expected to see that the {{/admin/delete_topics}} 
is in the watcher list. However, I didn't find it. Sample output:

root@kafka-broker-2:/opt/kafka# echo wchc | nc ZOOKEEPER_IP 2181
0x20010021139
    /admin/preferred_replica_election
    /brokers/ids/0
    /brokers/ids/1
    /brokers/ids/2
    /brokers/topics/__consumer_offsets
    /brokers/ids/3
    /brokers/ids/4
    /controller
    /admin/reassign_partitions
    /brokers/topics/test-test
    /feature
0x200100211390001
    /controller
    /feature
0x1631f9
    /controller
    /feature


> /admin/delete_topics is not in the list of zookeeper watchers
> -
>
> Key: KAFKA-14234
> URL: https://issues.apache.org/jira/browse/KAFKA-14234
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 3.2.1
>Reporter: Yan Xue
>Priority: Minor
>
> I deployed the Kafka cluster on Kuberentes and am trying to figure out how 
> topic deletion works. I know Kafka controller has the topic deletion manager 
> which watches the node change in the zookeeper. Whenever a topic is deleted, 
> the manager is triggered. I expected to see that the {{/admin/delete_topics}} 
> is in the watcher list. However, I didn't find it. Sample output:
> root@kafka-broker-2:/opt/kafka# echo wchc | nc ZOOKEEPER_IP 2181
> 0x20010021139
>     /admin/preferred_replica_election
>     /brokers/ids/0
>     /brokers/ids/1
>     /brokers/ids/2
>     /brokers/topics/__consumer_offsets
>     /brokers/ids/3
>     /brokers/ids/4
>     /controller
>     /admin/reassign_partitions
>     /brokers/topics/test-test
>     /feature
> 0x200100211390001
>     /controller
>     /feature
> 0x1631f9
>     /controller
>     /feature
>  
> Even though I can delete the topic, I am confused about the output.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14234) /admin/delete_topics is not in the list of zookeeper watchers

2022-09-14 Thread Yan Xue (Jira)
Yan Xue created KAFKA-14234:
---

 Summary: /admin/delete_topics is not in the list of zookeeper 
watchers
 Key: KAFKA-14234
 URL: https://issues.apache.org/jira/browse/KAFKA-14234
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 3.2.1
Reporter: Yan Xue


I deployed the Kafka cluster on Kuberentes and am trying to figure out how 
topic deletion works. I know Kafka controller has the topic deletion manager 
which watches the node change in the zookeeper. Whenever a topic is deleted, 
the manager is triggered. I expected to see that the {{/admin/delete_topics}} 
is in the watcher list. However, I didn't find it. Sample output:

root@kafka-broker-2:/opt/kafka# echo wchc | nc ZOOKEEPER_IP 2181
0x20010021139
    /admin/preferred_replica_election
    /brokers/ids/0
    /brokers/ids/1
    /brokers/ids/2
    /brokers/topics/__consumer_offsets
    /brokers/ids/3
    /brokers/ids/4
    /controller
    /admin/reassign_partitions
    /brokers/topics/test-test
    /feature
0x200100211390001
    /controller
    /feature
0x1631f9
    /controller
    /feature



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] lihaosky commented on a diff in pull request #12638: Register and unregister changelog topics in state updater

2022-09-14 Thread GitBox


lihaosky commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r971383169


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java:
##
@@ -31,6 +33,8 @@ public interface ChangelogRegister {
  */
 void register(final TopicPartition partition, final ProcessorStateManager 
stateManager);
 
+void register(final Set partition, final 
ProcessorStateManager stateManager);

Review Comment:
   nit: partition -> partitions?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##
@@ -444,6 +451,8 @@ public void restore(final Map tasks) {
 final Set corruptedTasks = new HashSet<>();
 e.partitions().forEach(partition -> 
corruptedTasks.add(changelogs.get(partition).stateManager.taskId()));
 throw new TaskCorruptedException(corruptedTasks, e);
+} catch (final InterruptException interruptException) {
+throw interruptException;

Review Comment:
   QQ: will `InterruptException` be thrown even without this?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##
@@ -347,6 +348,12 @@ public void register(final TopicPartition partition, final 
ProcessorStateManager
 }
 }
 
+public void register(final Set changelogPartitions, final 
ProcessorStateManager stateManager) {

Review Comment:
   nit: `@Override`



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##
@@ -209,7 +212,9 @@ void registerStateStores(final List allStores, 
final InternalProcess
 processorContext.uninitialize();
 for (final StateStore store : allStores) {
 if (stores.containsKey(store.name())) {
-maybeRegisterStoreWithChangelogReader(store.name());
+if (!stateUpdaterEnabled) {
+maybeRegisterStoreWithChangelogReader(store.name());

Review Comment:
   QQ: this method is called `registerStateStores` but why does it expect 
`store` to be already in `stores`? The only place I can find `stores.put` is 
called is in `registerStore` and in that method, 
`maybeRegisterStoreWithChangelogReader` is called immediately after 
`stores.put` is called. So I'm confused what's the real purpose of this method 
and if `maybeRegisterStoreWithChangelogReader` call here is redundant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

2022-09-14 Thread GitBox


cmccabe commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971319874


##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##
@@ -138,6 +139,15 @@ public List authorize(
 return results;
 }
 
+@Override
+public AuthorizationResult authorizeByResourceType(

Review Comment:
   mainly I added this because the benchmarks looked really bad without it  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

2022-09-14 Thread GitBox


mumrah commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971306933


##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##
@@ -138,6 +139,15 @@ public List authorize(
 return results;
 }
 
+@Override
+public AuthorizationResult authorizeByResourceType(

Review Comment:
   If I understand correctly, this implementation was added to take advantage 
of the new binary search approach in the ACL array. IOW, an optimization over 
the default `authorizeByResourceType` impl?



##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/ClusterMetadataAuthorizer.java:
##
@@ -76,14 +78,15 @@ public interface ClusterMetadataAuthorizer extends 
Authorizer {
 void loadSnapshot(Map acls);
 
 /**
- * Add a new ACL. Any ACL with the same ID will be replaced.
- */
-void addAcl(Uuid id, StandardAcl acl);
-
-/**
- * Remove the ACL with the given ID.
+ * Add or remove ACLs.
+ *
+ * @param newAcls   The ACLs to add.
+ * @param removedAclIds The ACL IDs to remove.
  */
-void removeAcl(Uuid id);
+void applyAclChanges(

Review Comment:
   We should document that this method does not expect duplicates or allow 
replacing ACL by ID. 



##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##
@@ -182,59 +174,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId,
 loadingComplete,
 newSuperUsers,
 newDefaultResult,
-aclsByResource,
-aclsById);
+acls);
 }
 
-StandardAuthorizerData copyWithNewAcls(Collection> aclEntries) {
-StandardAuthorizerData newData = new StandardAuthorizerData(
-log,
-aclMutator,
-loadingComplete,
-superUsers,
-defaultRule.result,
-new ConcurrentSkipListSet<>(),
-new ConcurrentHashMap<>());
-for (Entry entry : aclEntries) {
-newData.addAcl(entry.getKey(), entry.getValue());
-}
-log.info("Applied {} acl(s) from image.", aclEntries.size());
-return newData;
+StandardAuthorizerData copyWithAllNewAcls(
+Collection> newAclEntries
+) {
+return copyWithNewAcls(EMPTY_ACLS, newAclEntries, 
Collections.emptySet());
 }
 
-void addAcl(Uuid id, StandardAcl acl) {
-try {
-StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
-if (prevAcl != null) {
-throw new RuntimeException("An ACL with ID " + id + " already 
exists.");
-}
-if (!aclsByResource.add(acl)) {
-aclsById.remove(id);
-throw new RuntimeException("Unable to add the ACL with ID " + 
id +
-" to aclsByResource");
-}
-log.trace("Added ACL {}: {}", id, acl);
-} catch (Throwable e) {
-log.error("addAcl error", e);
-throw e;
-}
+StandardAuthorizerData copyWithAclChanges(
+Collection> newAclEntries,
+Set removedAclIds
+) {
+return copyWithNewAcls(acls, newAclEntries, removedAclIds);
 }
 
-void removeAcl(Uuid id) {
-try {
-StandardAcl acl = aclsById.remove(id);
-if (acl == null) {
-throw new RuntimeException("ID " + id + " not found in 
aclsById.");
+StandardAuthorizerData copyWithNewAcls(
+StandardAclWithId[] existingAcls,
+Collection> newAclEntries,
+Set removedAclIds
+) {
+int newSize = existingAcls.length + newAclEntries.size() - 
removedAclIds.size();
+StandardAclWithId[] newAcls = new StandardAclWithId[newSize];
+int numRemoved = 0, j = 0;
+for (int i = 0; i < existingAcls.length; i++) {
+StandardAclWithId aclWithId = existingAcls[i];
+if (removedAclIds.contains(aclWithId.id())) {
+numRemoved++;
+} else {
+newAcls[j++] = aclWithId;
 }
-if (!aclsByResource.remove(acl)) {
-throw new RuntimeException("Unable to remove the ACL with ID " 
+ id +
-" from aclsByResource");
+}
+if (numRemoved < removedAclIds.size()) {
+throw new RuntimeException("Only located " + numRemoved + " out of 
" +
+removedAclIds.size() + " removed ACL ID(s). removedAclIds = " +
+removedAclIds.stream().map(a -> 
a.toString()).collect(Collectors.joining(", ")));
+}
+if (!newAclEntries.isEmpty()) {
+int i = 0;
+for (Entry entry : newAclEntries) {
+newAcls[existingAcls.length + i] = new 
StandardAclWithId(entry.getKey(), entry.getValue());

Review Comment:
   Should this index be offset by the number we removed?



##

[GitHub] [kafka] artemlivshits commented on pull request #12570: KAFKA-14156: Built-in partitioner may create suboptimal batches

2022-09-14 Thread GitBox


artemlivshits commented on PR #12570:
URL: https://github.com/apache/kafka/pull/12570#issuecomment-1247321505

   Looked through the failed tests -- seem unrelated (also ran locally - pass).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

2022-09-14 Thread GitBox


cmccabe commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971280735


##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##
@@ -182,59 +174,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId,
 loadingComplete,
 newSuperUsers,
 newDefaultResult,
-aclsByResource,
-aclsById);
+acls);
 }
 
-StandardAuthorizerData copyWithNewAcls(Collection> aclEntries) {
-StandardAuthorizerData newData = new StandardAuthorizerData(
-log,
-aclMutator,
-loadingComplete,
-superUsers,
-defaultRule.result,
-new ConcurrentSkipListSet<>(),
-new ConcurrentHashMap<>());
-for (Entry entry : aclEntries) {
-newData.addAcl(entry.getKey(), entry.getValue());
-}
-log.info("Applied {} acl(s) from image.", aclEntries.size());
-return newData;
+StandardAuthorizerData copyWithAllNewAcls(
+Collection> newAclEntries
+) {
+return copyWithNewAcls(EMPTY_ACLS, newAclEntries, 
Collections.emptySet());
 }
 
-void addAcl(Uuid id, StandardAcl acl) {
-try {
-StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
-if (prevAcl != null) {
-throw new RuntimeException("An ACL with ID " + id + " already 
exists.");
-}
-if (!aclsByResource.add(acl)) {
-aclsById.remove(id);
-throw new RuntimeException("Unable to add the ACL with ID " + 
id +
-" to aclsByResource");
-}

Review Comment:
   Like I said above, we'd have to scan the whole array since we don't have a 
map from id -> acl any more. I don't think it's worth it just for a sanity check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

2022-09-14 Thread GitBox


cmccabe commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971280346


##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##
@@ -182,59 +174,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId,
 loadingComplete,
 newSuperUsers,
 newDefaultResult,
-aclsByResource,
-aclsById);
+acls);
 }
 
-StandardAuthorizerData copyWithNewAcls(Collection> aclEntries) {
-StandardAuthorizerData newData = new StandardAuthorizerData(
-log,
-aclMutator,
-loadingComplete,
-superUsers,
-defaultRule.result,
-new ConcurrentSkipListSet<>(),
-new ConcurrentHashMap<>());
-for (Entry entry : aclEntries) {
-newData.addAcl(entry.getKey(), entry.getValue());
-}
-log.info("Applied {} acl(s) from image.", aclEntries.size());
-return newData;
+StandardAuthorizerData copyWithAllNewAcls(
+Collection> newAclEntries
+) {
+return copyWithNewAcls(EMPTY_ACLS, newAclEntries, 
Collections.emptySet());
 }
 
-void addAcl(Uuid id, StandardAcl acl) {
-try {
-StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
-if (prevAcl != null) {
-throw new RuntimeException("An ACL with ID " + id + " already 
exists.");
-}
-if (!aclsByResource.add(acl)) {
-aclsById.remove(id);
-throw new RuntimeException("Unable to add the ACL with ID " + 
id +
-" to aclsByResource");
-}
-log.trace("Added ACL {}: {}", id, acl);
-} catch (Throwable e) {
-log.error("addAcl error", e);
-throw e;
-}
+StandardAuthorizerData copyWithAclChanges(
+Collection> newAclEntries,
+Set removedAclIds
+) {
+return copyWithNewAcls(acls, newAclEntries, removedAclIds);
 }
 
-void removeAcl(Uuid id) {
-try {
-StandardAcl acl = aclsById.remove(id);
-if (acl == null) {
-throw new RuntimeException("ID " + id + " not found in 
aclsById.");
+StandardAuthorizerData copyWithNewAcls(
+StandardAclWithId[] existingAcls,
+Collection> newAclEntries,
+Set removedAclIds
+) {
+StandardAclWithId[] newAcls = new StandardAclWithId[
+existingAcls.length + newAclEntries.size() - 
removedAclIds.size()];
+int numRemoved = 0, j = 0;
+for (int i = 0; i < existingAcls.length; i++) {
+StandardAclWithId aclWithId = existingAcls[i];
+if (removedAclIds.contains(aclWithId.id())) {
+numRemoved++;
+} else {
+newAcls[j++] = aclWithId;
 }
-if (!aclsByResource.remove(acl)) {
-throw new RuntimeException("Unable to remove the ACL with ID " 
+ id +
-" from aclsByResource");
+}
+if (numRemoved < removedAclIds.size()) {
+throw new RuntimeException("Only located " + numRemoved + " out of 
" +
+removedAclIds.size() + " removed ACL ID(s). removedAclIds = " +
+removedAclIds.stream().map(a -> 
a.toString()).collect(Collectors.joining(", ")));
+}
+if (!newAclEntries.isEmpty()) {
+int i = 0;
+for (Entry entry : newAclEntries) {
+newAcls[existingAcls.length + i] = new 
StandardAclWithId(entry.getKey(), entry.getValue());
+i++;
 }

Review Comment:
   Duplicate IDs should not happen unless there is a bug. I do wish we could 
check for it here, but it would be very inefficient to do so, since we'd have 
to scan the whole array.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

2022-09-14 Thread GitBox


cmccabe commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971279173


##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##
@@ -138,6 +139,15 @@ public List authorize(
 return results;
 }

Review Comment:
   The purpose of doing this is to avoid loading the volatile multiple times. 
Each time we load a volatile, it is expensive because it requires an 
interlocked instruction.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-6221) ReplicaFetcherThread throws UnknownTopicOrPartitionException on topic creation

2022-09-14 Thread Idrissi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17604951#comment-17604951
 ] 

Idrissi commented on KAFKA-6221:


Hello , i am facing the same problem with the following message :
 * {{Message: Broker: Unknown topic or partition from ReplicaFetcherThread 
.Many topics are created with 6 partitions and replication factor 3. The 
problems seems to be triggered by the Replica Fetcher thread during the 
synchronizing with the leader . Do you think its the same problem as mentionned 
in this JIRA ticket ?}}{{ }}

> ReplicaFetcherThread throws UnknownTopicOrPartitionException on topic 
> creation 
> ---
>
> Key: KAFKA-6221
> URL: https://issues.apache.org/jira/browse/KAFKA-6221
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.1, 0.10.2.0, 0.10.2.1, 0.11.0.1, 1.0.0
> Environment: RHEL 7
>Reporter: Alex Dunayevsky
>Priority: Minor
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> This issue appeared to happen frequently on 0.10.2.0. 
> On 0.10.2.1 and 1.0.0 it's a way harder to reproduce. 
> We'll focus on reproducing it on 0.10.2.1 and 1.0.0.
> *TOPOLOGY:* 
>   3 brokers, 1 zk.
> *REPRODUCING STRATEGY:* 
> Create a few dozens topics (say, 40) one by one, each with replication factor 
> 2. Number of partitions, generally, does not matter but, for easier 
> reproduction, should not be too small (around 30 or so). 
> *CREATE 40 TOPICS:*
> {code:java} for i in {1..40}; do bin/kafka-topics.sh --create --topic 
> "topic${i}_p28_r2" --partitions 28 --replication-factor 2 --zookeeper :2165; 
> done {code}
> *ERRORS*
> {code:java}
> *BROKER 1*
> [2017-11-15 16:46:00,853] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,27] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,853] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,27] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,9] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,9] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,3] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,3] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,15] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,15] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,21] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,21] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> *BROKER 2*
> [2017-11-15 16:46:36,408] ERROR [ReplicaFetcherThread-0-3], Error for 
> partition [topic20_p28_r2,12] to broker 
> 3:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:36,408] ERROR [ReplicaFetcherThread-0-3], Error for 
> partition [topic20_p28_r2,12] to broker 
> 

[jira] [Comment Edited] (KAFKA-6221) ReplicaFetcherThread throws UnknownTopicOrPartitionException on topic creation

2022-09-14 Thread Idrissi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17604951#comment-17604951
 ] 

Idrissi edited comment on KAFKA-6221 at 9/14/22 8:18 PM:
-

Hello , i am facing the same problem with the following message :
 * {{Message: Broker: Unknown topic or partition from ReplicaFetcherThread .}}

 

{{Many topics are created with 6 partitions and replication factor 3. The 
problems seems to be triggered by the Replica Fetcher thread during the 
synchronization process with the leader . Do you think its the same problem as 
mentioned in this JIRA ticket ?}}{{ }}


was (Author: JIRAUSER295755):
Hello , i am facing the same problem with the following message :
 * {{Message: Broker: Unknown topic or partition from ReplicaFetcherThread .}}

 

{{Many topics are created with 6 partitions and replication factor 3. The 
problems seems to be triggered by the Replica Fetcher thread during the 
synchronizing with the leader . Do you think its the same problem as mentionned 
in this JIRA ticket ?}}{{ }}

> ReplicaFetcherThread throws UnknownTopicOrPartitionException on topic 
> creation 
> ---
>
> Key: KAFKA-6221
> URL: https://issues.apache.org/jira/browse/KAFKA-6221
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.1, 0.10.2.0, 0.10.2.1, 0.11.0.1, 1.0.0
> Environment: RHEL 7
>Reporter: Alex Dunayevsky
>Priority: Minor
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> This issue appeared to happen frequently on 0.10.2.0. 
> On 0.10.2.1 and 1.0.0 it's a way harder to reproduce. 
> We'll focus on reproducing it on 0.10.2.1 and 1.0.0.
> *TOPOLOGY:* 
>   3 brokers, 1 zk.
> *REPRODUCING STRATEGY:* 
> Create a few dozens topics (say, 40) one by one, each with replication factor 
> 2. Number of partitions, generally, does not matter but, for easier 
> reproduction, should not be too small (around 30 or so). 
> *CREATE 40 TOPICS:*
> {code:java} for i in {1..40}; do bin/kafka-topics.sh --create --topic 
> "topic${i}_p28_r2" --partitions 28 --replication-factor 2 --zookeeper :2165; 
> done {code}
> *ERRORS*
> {code:java}
> *BROKER 1*
> [2017-11-15 16:46:00,853] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,27] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,853] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,27] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,9] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,9] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,3] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,3] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,15] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,15] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,21] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,21] to broker 
> 

[jira] [Comment Edited] (KAFKA-6221) ReplicaFetcherThread throws UnknownTopicOrPartitionException on topic creation

2022-09-14 Thread Idrissi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17604951#comment-17604951
 ] 

Idrissi edited comment on KAFKA-6221 at 9/14/22 8:18 PM:
-

Hello , i am facing the same problem with the following message :
 * {{Message: Broker: Unknown topic or partition from ReplicaFetcherThread .}}

 

{{Many topics are created with 6 partitions and replication factor 3. The 
problems seems to be triggered by the Replica Fetcher thread during the 
synchronizing with the leader . Do you think its the same problem as mentionned 
in this JIRA ticket ?}}{{ }}


was (Author: JIRAUSER295755):
Hello , i am facing the same problem with the following message :
 * {{Message: Broker: Unknown topic or partition from ReplicaFetcherThread 
.Many topics are created with 6 partitions and replication factor 3. The 
problems seems to be triggered by the Replica Fetcher thread during the 
synchronizing with the leader . Do you think its the same problem as mentionned 
in this JIRA ticket ?}}{{ }}

> ReplicaFetcherThread throws UnknownTopicOrPartitionException on topic 
> creation 
> ---
>
> Key: KAFKA-6221
> URL: https://issues.apache.org/jira/browse/KAFKA-6221
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.1.1, 0.10.2.0, 0.10.2.1, 0.11.0.1, 1.0.0
> Environment: RHEL 7
>Reporter: Alex Dunayevsky
>Priority: Minor
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> This issue appeared to happen frequently on 0.10.2.0. 
> On 0.10.2.1 and 1.0.0 it's a way harder to reproduce. 
> We'll focus on reproducing it on 0.10.2.1 and 1.0.0.
> *TOPOLOGY:* 
>   3 brokers, 1 zk.
> *REPRODUCING STRATEGY:* 
> Create a few dozens topics (say, 40) one by one, each with replication factor 
> 2. Number of partitions, generally, does not matter but, for easier 
> reproduction, should not be too small (around 30 or so). 
> *CREATE 40 TOPICS:*
> {code:java} for i in {1..40}; do bin/kafka-topics.sh --create --topic 
> "topic${i}_p28_r2" --partitions 28 --replication-factor 2 --zookeeper :2165; 
> done {code}
> *ERRORS*
> {code:java}
> *BROKER 1*
> [2017-11-15 16:46:00,853] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,27] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,853] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,27] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,9] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,9] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,3] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,3] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,15] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,15] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,21] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> server does not host this topic-partition. (kafka.server.ReplicaFetcherThread)
> [2017-11-15 16:46:00,854] ERROR [ReplicaFetcherThread-0-2], Error for 
> partition [topic1_p28_r2,21] to broker 
> 2:org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This 
> 

[GitHub] [kafka] jolshan opened a new pull request, #12643: KAFKA-14097: make producer ID expiration a dynamic config

2022-09-14 Thread GitBox


jolshan opened a new pull request, #12643:
URL: https://github.com/apache/kafka/pull/12643

   Changed the integer value for producer ID expiration to a dynamically 
configurable value.
   
   Tested that the configuration can be changed dynamically and works as 
expected.
   
   I considered adding the producer expiration check interval into the config, 
but since it is not dynamic (and is harder to make it dynamic), I decided to 
keep it separate. A followup could be to incorporate all the producer state 
manager configs into this new object.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12642: KAFKA-14207; KRaft Operations documentation

2022-09-14 Thread GitBox


cmccabe commented on code in PR #12642:
URL: https://github.com/apache/kafka/pull/12642#discussion_r971205378


##
docs/ops.html:
##
@@ -3180,6 +3180,119 @@ 6.10 KRaft
+
+  Configuration
+
+  Process Roles
+
+  In KRaft mode each Kafka server can be configured as a controller, as a 
broker or as both using the process.roles property. This property 
can have the following values:
+
+  
+If process.roles is set to broker, the 
server acts as a broker.
+If process.roles is set to controller, the 
server acts as a controller.
+If process.roles is set to 
broker,controller, the server acts as a broker and a 
controller.
+If process.roles is not set at all, it is assumed to be 
in ZooKeeper mode.
+  
+
+  Nodes that act as both brokers and controllers are referred to as 
"combined" nodes. Combined nodes are simpler to operate for simple use cases 
like a development environment. The key disadvantage is that the controller 
will be less isolated from the rest of the system. Combined mode is not 
recommended is critical deployment environments.
+
+
+  Controllers
+
+  In KRaft mode, only a small group of specially selected servers can act 
as controllers (unlike the ZooKeeper-based mode, where any server can become 
the Controller). The specially selected controller servers will participate in 
the metadata quorum. Each controller server is either active, or a hot standby 
for the current active controller server.
+
+  A Kafka cluster will typically select 3 or 5 servers for this role, 
depending on factors like cost and the number of concurrent failures your 
system should withstand without availability impact. A majority of the 
controllers must be alive in order to maintain availability. With 3 
controllers, the cluster can tolerate 1 controller failure; with 5 controllers, 
the cluster can tolerate 2 controller failures.
+
+  All of the servers in a Kafka cluster discover the quorum voters using 
the controller.quorum.voters property. This identifies the quorum 
controller servers that should be used. All the controllers must be enumerated. 
Each controller is identified with their id, host and 
port information. This is an example configuration: 
controller.quorum.voters=id1@host1:port1,id2@host2:port2,id3@host3:port3
+
+  If the Kafka cluster has 3 controllers named controller1, controller2 and 
controller3 then controller3 may have the following:
+
+  
+process.roles=controller
+node.id=1
+listeners=CONTROLLER://controller1.example.com:9093
+controller.quorum.voters=1...@controller1.example.com:9093,2...@controller2.example.com:9093,3...@controller3.example.com:9093
+
+  Every broker and controller must set the 
controller.quorum.voters property. The node ID supplied in the 
controller.quorum.voters property must match the corresponding id 
on the controller servers. For example, on controller1, node.id must be set to 
1, and so forth. Each node ID must be unique across all the nodes in a 
particular cluster. No two nodes can have the same node ID regardless of their 
process.roles values.
+
+  Storage Tool
+  
+  The kafka-storage.sh random-uuid command can be used to 
generate a cluster ID for your new cluster. This cluster ID must be used when 
formatting each node in the cluster with the kafka-storage.sh 
format command.
+
+  This is different from how Kafka has operated in the past. Previously, 
Kafka would format blank storage directories automatically, and also generate a 
new cluster ID automatically. One reason for the change is that auto-formatting 
can sometimes obscure an error condition. This is particularly important for 
the metadata log maintained by the controller and broker servers. If a majority 
of the controllers were able to start with an empty log directory, a leader 
might be able to be elected with missing committed data.
+
+  Debugging
+
+  Metadata Quorum 
Tool
+
+  The kafka-metadata-quorum tool can be used to describe the 
runtime state of the cluster metadata partition. For example, the following 
command display a summary of the metadata quorum:
+
+ 
bin/kafka-metadata-quorum.sh --bootstrap-server  broker_host:port describe 
--status
+ClusterId:  fMCL8kv1SWm87L_Md-I2hg
+LeaderId:   3002
+LeaderEpoch:2
+HighWatermark:  10
+MaxFollowerLag: 0
+MaxFollowerLagTimeMs:   -1
+CurrentVoters:  [3000,3001,3002]
+CurrentObservers:   [0,1,2]
+
+  Dump Log Tool
+
+  The kafka-dump-log tool can be used to debug the log 
segments and snapshots for the cluster metadata directory. The tool will scan 
the provided files and decode the metadata records. For example, this command 
decodes and prints the records in the first log segment:
+
+ 
bin/kafka-dump-log.sh --cluster-metadata-decoder --skip-record-metadat --files 
metadata_log_dir/__cluster_metadata-0/.log

Review Comment:
   can we leave off `--skip-record-metadata`? I recall it making the output a 
bit weird. also, 

[GitHub] [kafka] cmccabe commented on a diff in pull request #12642: KAFKA-14207; KRaft Operations documentation

2022-09-14 Thread GitBox


cmccabe commented on code in PR #12642:
URL: https://github.com/apache/kafka/pull/12642#discussion_r971204480


##
docs/ops.html:
##
@@ -3180,6 +3180,119 @@ 6.10 KRaft
+
+  Configuration
+
+  Process Roles
+
+  In KRaft mode each Kafka server can be configured as a controller, as a 
broker or as both using the process.roles property. This property 
can have the following values:
+
+  
+If process.roles is set to broker, the 
server acts as a broker.
+If process.roles is set to controller, the 
server acts as a controller.
+If process.roles is set to 
broker,controller, the server acts as a broker and a 
controller.
+If process.roles is not set at all, it is assumed to be 
in ZooKeeper mode.
+  
+
+  Nodes that act as both brokers and controllers are referred to as 
"combined" nodes. Combined nodes are simpler to operate for simple use cases 
like a development environment. The key disadvantage is that the controller 
will be less isolated from the rest of the system. Combined mode is not 
recommended is critical deployment environments.
+
+
+  Controllers
+
+  In KRaft mode, only a small group of specially selected servers can act 
as controllers (unlike the ZooKeeper-based mode, where any server can become 
the Controller). The specially selected controller servers will participate in 
the metadata quorum. Each controller server is either active, or a hot standby 
for the current active controller server.
+
+  A Kafka cluster will typically select 3 or 5 servers for this role, 
depending on factors like cost and the number of concurrent failures your 
system should withstand without availability impact. A majority of the 
controllers must be alive in order to maintain availability. With 3 
controllers, the cluster can tolerate 1 controller failure; with 5 controllers, 
the cluster can tolerate 2 controller failures.
+
+  All of the servers in a Kafka cluster discover the quorum voters using 
the controller.quorum.voters property. This identifies the quorum 
controller servers that should be used. All the controllers must be enumerated. 
Each controller is identified with their id, host and 
port information. This is an example configuration: 
controller.quorum.voters=id1@host1:port1,id2@host2:port2,id3@host3:port3

Review Comment:
   can we put the example configuration in a PRE or CODE block, or whatever, so 
it shows as monospace? (Just a thought)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12642: KAFKA-14207; KRaft Operations documentation

2022-09-14 Thread GitBox


cmccabe commented on code in PR #12642:
URL: https://github.com/apache/kafka/pull/12642#discussion_r971203656


##
docs/ops.html:
##
@@ -3180,6 +3180,119 @@ 6.10 KRaft
+
+  Configuration
+
+  Process Roles
+
+  In KRaft mode each Kafka server can be configured as a controller, as a 
broker or as both using the process.roles property. This property 
can have the following values:
+
+  
+If process.roles is set to broker, the 
server acts as a broker.
+If process.roles is set to controller, the 
server acts as a controller.
+If process.roles is set to 
broker,controller, the server acts as a broker and a 
controller.
+If process.roles is not set at all, it is assumed to be 
in ZooKeeper mode.
+  
+
+  Nodes that act as both brokers and controllers are referred to as 
"combined" nodes. Combined nodes are simpler to operate for simple use cases 
like a development environment. The key disadvantage is that the controller 
will be less isolated from the rest of the system. Combined mode is not 
recommended is critical deployment environments.
+
+
+  Controllers
+
+  In KRaft mode, only a small group of specially selected servers can act 
as controllers (unlike the ZooKeeper-based mode, where any server can become 
the Controller). The specially selected controller servers will participate in 
the metadata quorum. Each controller server is either active, or a hot standby 
for the current active controller server.
+
+  A Kafka cluster will typically select 3 or 5 servers for this role, 
depending on factors like cost and the number of concurrent failures your 
system should withstand without availability impact. A majority of the 
controllers must be alive in order to maintain availability. With 3 
controllers, the cluster can tolerate 1 controller failure; with 5 controllers, 
the cluster can tolerate 2 controller failures.

Review Comment:
   How about
   > A Kafka admin will typically select...
   the Kafka cluster hasn't achieved sentience  YET. :)



##
docs/ops.html:
##
@@ -3180,6 +3180,119 @@ 6.10 KRaft
+
+  Configuration
+
+  Process Roles
+
+  In KRaft mode each Kafka server can be configured as a controller, as a 
broker or as both using the process.roles property. This property 
can have the following values:
+
+  
+If process.roles is set to broker, the 
server acts as a broker.
+If process.roles is set to controller, the 
server acts as a controller.
+If process.roles is set to 
broker,controller, the server acts as a broker and a 
controller.
+If process.roles is not set at all, it is assumed to be 
in ZooKeeper mode.
+  
+
+  Nodes that act as both brokers and controllers are referred to as 
"combined" nodes. Combined nodes are simpler to operate for simple use cases 
like a development environment. The key disadvantage is that the controller 
will be less isolated from the rest of the system. Combined mode is not 
recommended is critical deployment environments.
+
+
+  Controllers
+
+  In KRaft mode, only a small group of specially selected servers can act 
as controllers (unlike the ZooKeeper-based mode, where any server can become 
the Controller). The specially selected controller servers will participate in 
the metadata quorum. Each controller server is either active, or a hot standby 
for the current active controller server.
+
+  A Kafka cluster will typically select 3 or 5 servers for this role, 
depending on factors like cost and the number of concurrent failures your 
system should withstand without availability impact. A majority of the 
controllers must be alive in order to maintain availability. With 3 
controllers, the cluster can tolerate 1 controller failure; with 5 controllers, 
the cluster can tolerate 2 controller failures.

Review Comment:
   How about
   
   > A Kafka admin will typically select...
   
   the Kafka cluster hasn't achieved sentience  YET. :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12642: KAFKA-14207; KRaft Operations documentation

2022-09-14 Thread GitBox


cmccabe commented on code in PR #12642:
URL: https://github.com/apache/kafka/pull/12642#discussion_r971203144


##
docs/ops.html:
##
@@ -3180,6 +3180,119 @@ 6.10 KRaft
+
+  Configuration
+
+  Process Roles
+
+  In KRaft mode each Kafka server can be configured as a controller, as a 
broker or as both using the process.roles property. This property 
can have the following values:
+
+  
+If process.roles is set to broker, the 
server acts as a broker.
+If process.roles is set to controller, the 
server acts as a controller.
+If process.roles is set to 
broker,controller, the server acts as a broker and a 
controller.
+If process.roles is not set at all, it is assumed to be 
in ZooKeeper mode.
+  
+
+  Nodes that act as both brokers and controllers are referred to as 
"combined" nodes. Combined nodes are simpler to operate for simple use cases 
like a development environment. The key disadvantage is that the controller 
will be less isolated from the rest of the system. Combined mode is not 
recommended is critical deployment environments.

Review Comment:
   Maybe add an example of how operations are harder like "it is not possible 
to roll the controllers separately from the brokers when in combined mode" ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12642: KAFKA-14207; KRaft Operations documentation

2022-09-14 Thread GitBox


cmccabe commented on code in PR #12642:
URL: https://github.com/apache/kafka/pull/12642#discussion_r971201992


##
docs/ops.html:
##
@@ -3180,6 +3180,119 @@ 6.10 KRaft
+
+  Configuration
+
+  Process Roles
+
+  In KRaft mode each Kafka server can be configured as a controller, as a 
broker or as both using the process.roles property. This property 
can have the following values:
+
+  
+If process.roles is set to broker, the 
server acts as a broker.
+If process.roles is set to controller, the 
server acts as a controller.
+If process.roles is set to 
broker,controller, the server acts as a broker and a 
controller.

Review Comment:
   How about adding "both" ?
   > the server acts as *both* a broker and a controller.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12642: KAFKA-14207; KRaft Operations documentation

2022-09-14 Thread GitBox


cmccabe commented on code in PR #12642:
URL: https://github.com/apache/kafka/pull/12642#discussion_r971201651


##
docs/ops.html:
##
@@ -3180,6 +3180,119 @@ 6.10 KRaft
+
+  Configuration
+
+  Process Roles
+
+  In KRaft mode each Kafka server can be configured as a controller, as a 
broker or as both using the process.roles property. This property 
can have the following values:

Review Comment:
   grammar: how about
   > as a controller, a broker, or both by using the process.roles property



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio opened a new pull request, #12642: KAFKA-14207; KRaft Operations documentation

2022-09-14 Thread GitBox


jsancio opened a new pull request, #12642:
URL: https://github.com/apache/kafka/pull/12642

   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] abbccdda closed pull request #10156: KAFKA-10345: File watch store reloading

2022-09-14 Thread GitBox


abbccdda closed pull request #10156: KAFKA-10345: File watch store reloading 
URL: https://github.com/apache/kafka/pull/10156


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gitlw commented on pull request #12634: KAFKA-14225: Fix deadlock caused by lazy val exemptSensor

2022-09-14 Thread GitBox


gitlw commented on PR #12634:
URL: https://github.com/apache/kafka/pull/12634#issuecomment-1247113089

   rerun tests please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov commented on pull request #12632: KAFKA-12878 Support --bootstrap-server in kafka-streams-application-reset tool

2022-09-14 Thread GitBox


nizhikov commented on PR #12632:
URL: https://github.com/apache/kafka/pull/12632#issuecomment-1247109394

   I've checked tests failure.
   Looks like they are not related to the changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

2022-09-14 Thread GitBox


mumrah commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971116340


##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##
@@ -182,59 +174,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId,
 loadingComplete,
 newSuperUsers,
 newDefaultResult,
-aclsByResource,
-aclsById);
+acls);
 }
 
-StandardAuthorizerData copyWithNewAcls(Collection> aclEntries) {
-StandardAuthorizerData newData = new StandardAuthorizerData(
-log,
-aclMutator,
-loadingComplete,
-superUsers,
-defaultRule.result,
-new ConcurrentSkipListSet<>(),
-new ConcurrentHashMap<>());
-for (Entry entry : aclEntries) {
-newData.addAcl(entry.getKey(), entry.getValue());
-}
-log.info("Applied {} acl(s) from image.", aclEntries.size());
-return newData;
+StandardAuthorizerData copyWithAllNewAcls(
+Collection> newAclEntries
+) {
+return copyWithNewAcls(EMPTY_ACLS, newAclEntries, 
Collections.emptySet());
 }
 
-void addAcl(Uuid id, StandardAcl acl) {
-try {
-StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
-if (prevAcl != null) {
-throw new RuntimeException("An ACL with ID " + id + " already 
exists.");
-}
-if (!aclsByResource.add(acl)) {
-aclsById.remove(id);
-throw new RuntimeException("Unable to add the ACL with ID " + 
id +
-" to aclsByResource");
-}

Review Comment:
   I think we lost this existing ID and duplicate ACL check in the new array 
code. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

2022-09-14 Thread GitBox


mumrah commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971108745


##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##
@@ -96,18 +97,16 @@ public void completeInitialLoad(Exception e) {
 }
 
 @Override
-public void addAcl(Uuid id, StandardAcl acl) {
-data.addAcl(id, acl);
-}
-
-@Override
-public void removeAcl(Uuid id) {
-data.removeAcl(id);
+public synchronized void loadSnapshot(Map acls) {
+data = data.copyWithAllNewAcls(acls.entrySet());
 }
 
 @Override
-public synchronized void loadSnapshot(Map acls) {
-data = data.copyWithNewAcls(acls.entrySet());
+public synchronized void applyAclChanges(
+Collection> newAcls,

Review Comment:
   Oh, i must have been looking at StandardAclWithId :), this looks good as-is



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] akhileshchg commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

2022-09-14 Thread GitBox


akhileshchg commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971088491


##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##
@@ -109,14 +110,9 @@ public class StandardAuthorizerData {
 private final DefaultRule defaultRule;
 
 /**
- * Contains all of the current ACLs sorted by (resource type, resource 
name).
+ * An immutable array of all the current ACLs sorted by (resource type, 
resource name).
  */
-private final ConcurrentSkipListSet aclsByResource;

Review Comment:
   We're storing `Id` with the `StandardAcl`. Shouldn't that make it unique? I 
think since it is sorted, we can maybe have a conservative check to make sure 
there are no duplicate ids. 



##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##
@@ -96,18 +97,16 @@ public void completeInitialLoad(Exception e) {
 }
 
 @Override
-public void addAcl(Uuid id, StandardAcl acl) {
-data.addAcl(id, acl);
-}
-
-@Override
-public void removeAcl(Uuid id) {
-data.removeAcl(id);
+public synchronized void loadSnapshot(Map acls) {
+data = data.copyWithAllNewAcls(acls.entrySet());
 }
 
 @Override
-public synchronized void loadSnapshot(Map acls) {
-data = data.copyWithNewAcls(acls.entrySet());
+public synchronized void applyAclChanges(
+Collection> newAcls,

Review Comment:
   I don't think `StandardAcl` has id with it. We have a different data 
structure for it`StandardAclWithId`



##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##
@@ -138,6 +139,15 @@ public List authorize(
 return results;
 }

Review Comment:
   In `authorize` function we still do `StandardAuthorizerData curData = 
data;`. I don't think we need to do this anymore.



##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##
@@ -182,59 +174,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId,
 loadingComplete,
 newSuperUsers,
 newDefaultResult,
-aclsByResource,
-aclsById);
+acls);
 }
 
-StandardAuthorizerData copyWithNewAcls(Collection> aclEntries) {
-StandardAuthorizerData newData = new StandardAuthorizerData(
-log,
-aclMutator,
-loadingComplete,
-superUsers,
-defaultRule.result,
-new ConcurrentSkipListSet<>(),
-new ConcurrentHashMap<>());
-for (Entry entry : aclEntries) {
-newData.addAcl(entry.getKey(), entry.getValue());
-}
-log.info("Applied {} acl(s) from image.", aclEntries.size());
-return newData;
+StandardAuthorizerData copyWithAllNewAcls(
+Collection> newAclEntries
+) {
+return copyWithNewAcls(EMPTY_ACLS, newAclEntries, 
Collections.emptySet());
 }
 
-void addAcl(Uuid id, StandardAcl acl) {
-try {
-StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
-if (prevAcl != null) {
-throw new RuntimeException("An ACL with ID " + id + " already 
exists.");
-}
-if (!aclsByResource.add(acl)) {
-aclsById.remove(id);
-throw new RuntimeException("Unable to add the ACL with ID " + 
id +
-" to aclsByResource");
-}
-log.trace("Added ACL {}: {}", id, acl);
-} catch (Throwable e) {
-log.error("addAcl error", e);
-throw e;
-}
+StandardAuthorizerData copyWithAclChanges(
+Collection> newAclEntries,
+Set removedAclIds
+) {
+return copyWithNewAcls(acls, newAclEntries, removedAclIds);
 }
 
-void removeAcl(Uuid id) {
-try {
-StandardAcl acl = aclsById.remove(id);
-if (acl == null) {
-throw new RuntimeException("ID " + id + " not found in 
aclsById.");
+StandardAuthorizerData copyWithNewAcls(
+StandardAclWithId[] existingAcls,
+Collection> newAclEntries,
+Set removedAclIds
+) {
+StandardAclWithId[] newAcls = new StandardAclWithId[
+existingAcls.length + newAclEntries.size() - 
removedAclIds.size()];
+int numRemoved = 0, j = 0;
+for (int i = 0; i < existingAcls.length; i++) {
+StandardAclWithId aclWithId = existingAcls[i];
+if (removedAclIds.contains(aclWithId.id())) {
+numRemoved++;
+} else {
+newAcls[j++] = aclWithId;
 }
-if (!aclsByResource.remove(acl)) {
-throw new RuntimeException("Unable to remove the ACL with ID " 
+ id +
-" from aclsByResource");
+}
+if (numRemoved < 

[GitHub] [kafka] artemlivshits commented on a diff in pull request #12570: KAFKA-14156: Built-in partitioner may create suboptimal batches

2022-09-14 Thread GitBox


artemlivshits commented on code in PR #12570:
URL: https://github.com/apache/kafka/pull/12570#discussion_r971097010


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##
@@ -1129,31 +1129,34 @@ public void onCompletion(RecordMetadata metadata, 
Exception exception) {
 assertEquals(1, mockRandom.get());
 
 // Produce large record, we should exceed "sticky" limit, but 
produce to this partition
-// as we switch after the "sticky" limit is exceeded.  The 
partition is switched after
-// we produce.
+// as we try to switch after the "sticky" limit is exceeded.  The 
switch is disabled
+// because of incomplete batch.
 byte[] largeValue = new byte[batchSize];
 accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, 
largeValue, Record.EMPTY_HEADERS,
 callbacks, maxBlockTimeMs, false, time.milliseconds(), 
cluster);
 assertEquals(partition1, partition.get());
-assertEquals(2, mockRandom.get());
+assertEquals(1, mockRandom.get());
 
-// Produce large record, we should switch to next partition.
+// Produce large record, we switched to next partition by previous 
produce, but

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

2022-09-14 Thread GitBox


cmccabe commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971095900


##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##
@@ -182,59 +173,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId,
 loadingComplete,
 newSuperUsers,
 newDefaultResult,
-aclsByResource,
-aclsById);
+acls);
 }
 
-StandardAuthorizerData copyWithNewAcls(Collection> aclEntries) {
-StandardAuthorizerData newData = new StandardAuthorizerData(
-log,
-aclMutator,
-loadingComplete,
-superUsers,
-defaultRule.result,
-new ConcurrentSkipListSet<>(),
-new ConcurrentHashMap<>());
-for (Entry entry : aclEntries) {
-newData.addAcl(entry.getKey(), entry.getValue());
-}
-log.info("Applied {} acl(s) from image.", aclEntries.size());
-return newData;
+StandardAuthorizerData copyWithAllNewAcls(
+Collection> newAclEntries
+) {
+return copyWithNewAcls(EMPTY_ACLS, newAclEntries, 
Collections.emptySet());
 }
 
-void addAcl(Uuid id, StandardAcl acl) {
-try {
-StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
-if (prevAcl != null) {
-throw new RuntimeException("An ACL with ID " + id + " already 
exists.");
-}
-if (!aclsByResource.add(acl)) {
-aclsById.remove(id);
-throw new RuntimeException("Unable to add the ACL with ID " + 
id +
-" to aclsByResource");
-}
-log.trace("Added ACL {}: {}", id, acl);
-} catch (Throwable e) {
-log.error("addAcl error", e);
-throw e;
-}
+StandardAuthorizerData copyWithAclChanges(
+Collection> newAclEntries,
+Set removedAclIds
+) {
+return copyWithNewAcls(acls, newAclEntries, removedAclIds);
 }
 
-void removeAcl(Uuid id) {
-try {
-StandardAcl acl = aclsById.remove(id);
-if (acl == null) {
-throw new RuntimeException("ID " + id + " not found in 
aclsById.");
+StandardAuthorizerData copyWithNewAcls(
+StandardAclWithId[] existingAcls,
+Collection> newAclEntries,
+Set removedAclIds
+) {
+StandardAclWithId[] newAcls = new StandardAclWithId[
+existingAcls.length + newAclEntries.size() - 
removedAclIds.size()];

Review Comment:
   good idea



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers

2022-09-14 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17604862#comment-17604862
 ] 

Nicholas Telford commented on KAFKA-10635:
--

Hi [~guozhang],

I've managed to pull some logs from a recent occurrence of this issue. I 
specifically focused the logs on the partition and broker that produces the 
error, otherwise there would be thousands of irrelevant log messages. I've also 
replaced the name of the partitions in question with placeholder names 
({{myapp}} and {{some-processor}}), to prevent leaking confidential information.

We use a structured logging system, so I've converted the logs to CSV. I hope 
you find this format easy to understand. If you feel there's information 
missing that would help (e.g. logger name, a broader search on the logs, etc.) 
then let me know, and I'll see what I can do.

See attached [^logs.csv] 

> Streams application fails with OutOfOrderSequenceException after rolling 
> restarts of brokers
> 
>
> Key: KAFKA-10635
> URL: https://issues.apache.org/jira/browse/KAFKA-10635
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.5.1
>Reporter: Peeraya Maetasatidsuk
>Priority: Blocker
> Attachments: logs.csv
>
>
> We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a 
> rolling restart of the brokers after installing the new version. After the 
> restarts we notice one of our streams app (client version 2.4.1) fails with 
> OutOfOrderSequenceException:
>  
> {code:java}
> ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected 
> error. Record: a_record, destination topic: 
> topic-name-Aggregation-repartition 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> ERROR [2020-10-13 22:52:21,413] 
> [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread 
> [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the 
> following error: org.apache.kafka.streams.errors.StreamsException: task 
> [1_39] Abort sending since an error caught with a previous record (timestamp 
> 1602654659000) to topic topic-name-Aggregation-repartition due to 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:204)
> at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674)
> at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596)
> at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798)
> at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)   
>  at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
> at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)   
>  at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)   
>  at java.base/java.lang.Thread.run(Thread.java:834)Caused by: 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> {code}
> We see a corresponding error on the broker side:
> {code:java}
> [2020-10-13 22:52:21,398] ERROR [ReplicaManager broker=137636348] Error 
> processing append operation on partition 
> topic-name-Aggregation-repartition-52  
> (kafka.server.ReplicaManager)org.apache.kafka.common.errors.OutOfOrderSequenceException:
>  Out of order sequence number for producerId 2819098 at offset 1156041 in 
> partition topic-name-Aggregation-repartition-52: 29 

[jira] [Updated] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers

2022-09-14 Thread Nicholas Telford (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nicholas Telford updated KAFKA-10635:
-
Attachment: logs.csv

> Streams application fails with OutOfOrderSequenceException after rolling 
> restarts of brokers
> 
>
> Key: KAFKA-10635
> URL: https://issues.apache.org/jira/browse/KAFKA-10635
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.5.1
>Reporter: Peeraya Maetasatidsuk
>Priority: Blocker
> Attachments: logs.csv
>
>
> We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a 
> rolling restart of the brokers after installing the new version. After the 
> restarts we notice one of our streams app (client version 2.4.1) fails with 
> OutOfOrderSequenceException:
>  
> {code:java}
> ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected 
> error. Record: a_record, destination topic: 
> topic-name-Aggregation-repartition 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> ERROR [2020-10-13 22:52:21,413] 
> [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread 
> [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the 
> following error: org.apache.kafka.streams.errors.StreamsException: task 
> [1_39] Abort sending since an error caught with a previous record (timestamp 
> 1602654659000) to topic topic-name-Aggregation-repartition due to 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:204)
> at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674)
> at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596)
> at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798)
> at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)   
>  at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
> at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)   
>  at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)   
>  at java.base/java.lang.Thread.run(Thread.java:834)Caused by: 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> {code}
> We see a corresponding error on the broker side:
> {code:java}
> [2020-10-13 22:52:21,398] ERROR [ReplicaManager broker=137636348] Error 
> processing append operation on partition 
> topic-name-Aggregation-repartition-52  
> (kafka.server.ReplicaManager)org.apache.kafka.common.errors.OutOfOrderSequenceException:
>  Out of order sequence number for producerId 2819098 at offset 1156041 in 
> partition topic-name-Aggregation-repartition-52: 29 (incoming seq. number), 
> -1 (current end sequence number)
> {code}
> We are able to reproduce this many times and it happens regardless of whether 
> the broker shutdown (at restart) is clean or unclean. However, when we 
> rollback the broker version to 2.3.1 from 2.5.1 and perform similar rolling 
> restarts, we don't see this error on the streams application at all. This is 
> blocking us from upgrading our broker version. 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

2022-09-14 Thread GitBox


cmccabe commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971094152


##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##
@@ -96,18 +97,16 @@ public void completeInitialLoad(Exception e) {
 }
 
 @Override
-public void addAcl(Uuid id, StandardAcl acl) {
-data.addAcl(id, acl);
-}
-
-@Override
-public void removeAcl(Uuid id) {
-data.removeAcl(id);
+public synchronized void loadSnapshot(Map acls) {
+data = data.copyWithAllNewAcls(acls.entrySet());
 }
 
 @Override
-public synchronized void loadSnapshot(Map acls) {
-data = data.copyWithNewAcls(acls.entrySet());
+public synchronized void applyAclChanges(
+Collection> newAcls,

Review Comment:
   However, we could use StandardAclWithId here. That would require another 
copy in the case where we were loading a snapshot, so I'm not sure if it's 
worth it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

2022-09-14 Thread GitBox


cmccabe commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971092861


##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##
@@ -109,14 +110,9 @@ public class StandardAuthorizerData {
 private final DefaultRule defaultRule;
 
 /**
- * Contains all of the current ACLs sorted by (resource type, resource 
name).
+ * An immutable array of all the current ACLs sorted by (resource type, 
resource name).
  */
-private final ConcurrentSkipListSet aclsByResource;

Review Comment:
   TreeSets are quite slow because they have poor memory locality. They also 
use a lot more memory. Ideally we could use something like a BTree, but Java 
doesn't have those, unfortunately...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

2022-09-14 Thread GitBox


cmccabe commented on PR #12636:
URL: https://github.com/apache/kafka/pull/12636#issuecomment-1247070076

   Jenkins went down again. :(
   
   ```
   java.nio.file.FileSystemException: 
/home/jenkins/workspace/Kafka_kafka-pr_PR-12636: No space left on device
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

2022-09-14 Thread GitBox


cmccabe commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r971091182


##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##
@@ -96,18 +97,16 @@ public void completeInitialLoad(Exception e) {
 }
 
 @Override
-public void addAcl(Uuid id, StandardAcl acl) {
-data.addAcl(id, acl);
-}
-
-@Override
-public void removeAcl(Uuid id) {
-data.removeAcl(id);
+public synchronized void loadSnapshot(Map acls) {
+data = data.copyWithAllNewAcls(acls.entrySet());
 }
 
 @Override
-public synchronized void loadSnapshot(Map acls) {
-data = data.copyWithNewAcls(acls.entrySet());
+public synchronized void applyAclChanges(
+Collection> newAcls,

Review Comment:
   StandardAcl doesn't contain an ID



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas opened a new pull request, #12641: KAFKA-14209 : Change Topology optimization to accept list of rules 1/3

2022-09-14 Thread GitBox


vpapavas opened a new pull request, #12641:
URL: https://github.com/apache/kafka/pull/12641

   This PR is part of a series implementing the self-join rewriting. As part of 
it, we decided to clean up the `TOPOLOGY_OPTIMIZATION_CONFIG` and make it a 
list of optimization rules.  Acceptable values are: NO_OPTIMIZATION, OPTIMIZE 
which applies all optimization rules or a comma separated list of specific 
optimizations.
   
   Added unit tests
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12638: Register and unregister changelog topics in state updater

2022-09-14 Thread GitBox


guozhangwang commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r971076310


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##
@@ -575,8 +584,10 @@ public void close() throws ProcessorStateException {
 void recycle() {
 log.debug("Recycling state for {} task {}.", taskType, taskId);
 
-final List allChangelogs = 
getAllChangelogTopicPartitions();
-changelogReader.unregister(allChangelogs);
+if (!stateUpdaterEnabled) {
+final List allChangelogs = 
getAllChangelogTopicPartitions();
+changelogReader.unregister(allChangelogs);
+}

Review Comment:
   +100



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12639: KAFKA-14233: do not init managers twice to avoid resource leak

2022-09-14 Thread GitBox


divijvaidya commented on PR #12639:
URL: https://github.com/apache/kafka/pull/12639#issuecomment-1247027848

   Wow! Great deep dive to find the root cause here @showuon 
   
   I am curious, how did you narrow down that `Gradle Test Executor 128` is 
related to `testReloadUpdatedFilesWithoutConfigChange`. Did you check the the 
logs and search for exception stack traces and managed to get the 
`IOException`? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers

2022-09-14 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17604838#comment-17604838
 ] 

Guozhang Wang commented on KAFKA-10635:
---

Hmm okay I think we'd need to reproduce this which can help getting a better 
trace on the broker side. cc [~hachikuji]

At the mean time, do you happen to still have the broker-side logs on the 
OOOSException thrown, if yes could you share in the comments?

> Streams application fails with OutOfOrderSequenceException after rolling 
> restarts of brokers
> 
>
> Key: KAFKA-10635
> URL: https://issues.apache.org/jira/browse/KAFKA-10635
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.5.1
>Reporter: Peeraya Maetasatidsuk
>Priority: Blocker
>
> We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a 
> rolling restart of the brokers after installing the new version. After the 
> restarts we notice one of our streams app (client version 2.4.1) fails with 
> OutOfOrderSequenceException:
>  
> {code:java}
> ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected 
> error. Record: a_record, destination topic: 
> topic-name-Aggregation-repartition 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> ERROR [2020-10-13 22:52:21,413] 
> [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread 
> [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the 
> following error: org.apache.kafka.streams.errors.StreamsException: task 
> [1_39] Abort sending since an error caught with a previous record (timestamp 
> 1602654659000) to topic topic-name-Aggregation-repartition due to 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:204)
> at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674)
> at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596)
> at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798)
> at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)   
>  at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
> at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)   
>  at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)   
>  at java.base/java.lang.Thread.run(Thread.java:834)Caused by: 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> {code}
> We see a corresponding error on the broker side:
> {code:java}
> [2020-10-13 22:52:21,398] ERROR [ReplicaManager broker=137636348] Error 
> processing append operation on partition 
> topic-name-Aggregation-repartition-52  
> (kafka.server.ReplicaManager)org.apache.kafka.common.errors.OutOfOrderSequenceException:
>  Out of order sequence number for producerId 2819098 at offset 1156041 in 
> partition topic-name-Aggregation-repartition-52: 29 (incoming seq. number), 
> -1 (current end sequence number)
> {code}
> We are able to reproduce this many times and it happens regardless of whether 
> the broker shutdown (at restart) is clean or unclean. However, when we 
> rollback the broker version to 2.3.1 from 2.5.1 and perform similar rolling 
> restarts, we don't see this error on the streams application at all. This is 
> blocking us from upgrading our broker version. 
>  



--
This message was sent by Atlassian Jira

[GitHub] [kafka] junrao commented on a diff in pull request #12570: KAFKA-14156: Built-in partitioner may create suboptimal batches

2022-09-14 Thread GitBox


junrao commented on code in PR #12570:
URL: https://github.com/apache/kafka/pull/12570#discussion_r971047147


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##
@@ -1129,31 +1129,34 @@ public void onCompletion(RecordMetadata metadata, 
Exception exception) {
 assertEquals(1, mockRandom.get());
 
 // Produce large record, we should exceed "sticky" limit, but 
produce to this partition
-// as we switch after the "sticky" limit is exceeded.  The 
partition is switched after
-// we produce.
+// as we try to switch after the "sticky" limit is exceeded.  The 
switch is disabled
+// because of incomplete batch.
 byte[] largeValue = new byte[batchSize];
 accum.append(topic, RecordMetadata.UNKNOWN_PARTITION, 0L, null, 
largeValue, Record.EMPTY_HEADERS,
 callbacks, maxBlockTimeMs, false, time.milliseconds(), 
cluster);
 assertEquals(partition1, partition.get());
-assertEquals(2, mockRandom.get());
+assertEquals(1, mockRandom.get());
 
-// Produce large record, we should switch to next partition.
+// Produce large record, we switched to next partition by previous 
produce, but

Review Comment:
   To be precise, the previous produce didn't switch to the next partition. The 
produce of this record forces the closing of the current batch, which cause the 
switch to the next partition.



##
clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java:
##
@@ -1129,31 +1129,34 @@ public void onCompletion(RecordMetadata metadata, 
Exception exception) {
 assertEquals(1, mockRandom.get());
 
 // Produce large record, we should exceed "sticky" limit, but 
produce to this partition
-// as we switch after the "sticky" limit is exceeded.  The 
partition is switched after
-// we produce.
+// as we try to switch after the "sticky" limit is exceeded.  The 
switch is disabled
+// because of incomplete batch.

Review Comment:
   Thanks for the explanation, Artem. This makes sense to me now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #12392: KAFKA-14053: Transactional producer should bump the epoch and skip ab…

2022-09-14 Thread GitBox


jolshan commented on code in PR #12392:
URL: https://github.com/apache/kafka/pull/12392#discussion_r971022152


##
clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java:
##
@@ -2594,27 +2596,20 @@ public void testDropCommitOnBatchExpiry() throws 
InterruptedException {
 } catch (ExecutionException e) {
 assertTrue(e.getCause() instanceof  TimeoutException);
 }
+
 runUntil(commitResult::isCompleted);  // the commit shouldn't be 
completed without being sent since the produce request failed.
 assertFalse(commitResult.isSuccessful());  // the commit shouldn't 
succeed since the produce request failed.
-assertThrows(TimeoutException.class, commitResult::await);
+assertThrows(KafkaException.class, commitResult::await);
 
-assertTrue(transactionManager.hasAbortableError());
-assertTrue(transactionManager.hasOngoingTransaction());
+assertTrue(transactionManager.hasFatalBumpableError());
+assertFalse(transactionManager.hasOngoingTransaction());
 assertFalse(transactionManager.isCompleting());
-assertTrue(transactionManager.transactionContainsPartition(tp0));
 
-TransactionalRequestResult abortResult = 
transactionManager.beginAbort();
-
-prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, 
producerId, epoch);
-prepareInitPidResponse(Errors.NONE, false, producerId, (short) (epoch 
+ 1));
-runUntil(abortResult::isCompleted);
-assertTrue(abortResult.isSuccessful());
-assertFalse(transactionManager.hasOngoingTransaction());
-assertFalse(transactionManager.transactionContainsPartition(tp0));
+assertThrows(KafkaException.class, () -> 
transactionManager.beginAbort());

Review Comment:
   I was thinking for some longer term work we could potentially distinguish 
transactions by having perhaps having a bit of extra state server-side and by 
bumping the epoch after each transaction. But maybe this is too large of a 
change for now.
   
   I think you also came to the conclusion of an epoch bump but through a 
different path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12555: Optimize self-join

2022-09-14 Thread GitBox


guozhangwang commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r971010379


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamSelfJoin implements ProcessorSupplier {
+private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
+
+private final String windowName;
+private final long joinThisBeforeMs;
+private final long joinThisAfterMs;
+private final long joinOtherBeforeMs;
+private final long joinOtherAfterMs;
+private final ValueJoinerWithKey joinerThis;
+
+private final TimeTracker sharedTimeTracker;
+
+KStreamKStreamSelfJoin(
+final String windowName,
+final JoinWindowsInternal windows,
+final ValueJoinerWithKey joinerThis,
+final TimeTracker sharedTimeTracker) {
+
+this.windowName = windowName;
+this.joinThisBeforeMs = windows.beforeMs;
+this.joinThisAfterMs = windows.afterMs;
+this.joinOtherBeforeMs = windows.afterMs;
+this.joinOtherAfterMs = windows.beforeMs;
+this.joinerThis = joinerThis;
+this.sharedTimeTracker = sharedTimeTracker;
+}
+
+@Override
+public Processor get() {
+return new KStreamKStreamSelfJoinProcessor();
+}
+
+private class KStreamKStreamSelfJoinProcessor extends 
ContextualProcessor {
+private WindowStore windowStore;
+private Sensor droppedRecordsSensor;
+
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+
+final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
+droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
+windowStore = context.getStateStore(windowName);
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void process(final Record record) {
+if (StreamStreamJoinUtil.skipRecord(record, LOG, 
droppedRecordsSensor, context())) {
+return;
+}
+
+final long inputRecordTimestamp = record.timestamp();
+long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinThisBeforeMs);

Review Comment:
   I see. Is that the same as:
   
   1) loop once with the larger value of (this window, other window);
   2) for each record, check if it falls in both windows or not, if it falls in 
both windows, we emit it twice; otherwise we emit it once.



##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *

[GitHub] [kafka] jsancio merged pull request #12640: MINOR; Add missing li end tag

2022-09-14 Thread GitBox


jsancio merged PR #12640:
URL: https://github.com/apache/kafka/pull/12640


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #12636: KAFKA-14214: Convert StandardAuthorizer to copy-on-write

2022-09-14 Thread GitBox


mumrah commented on code in PR #12636:
URL: https://github.com/apache/kafka/pull/12636#discussion_r970977191


##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##
@@ -397,18 +387,34 @@ private MatchingAclRule findAclRule(
 return matchingAclBuilder.build();
 }
 
+/**
+ * Use a binary search to find the index of the first ACL which is greater 
than or
+ * equal to the given ACL. This may be equal to the end of the array if 
there are
+ * no such ACLs.
+ */
+private int indexOfFirstAclGreaterThanOrEqualTo(StandardAcl exemplar) {
+int i = Arrays.binarySearch(acls,
+new StandardAclWithId(Uuid.ZERO_UUID, exemplar),
+StandardAclWithId.ACL_COMPARATOR);
+// Arrays.binarySearch returns a positive number if it found an exact 
match, and
+// a negative number otherwise.

Review Comment:
   Might comment about what the negative return value indicates. It helps L404 
make more sense :)



##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##
@@ -109,14 +110,9 @@ public class StandardAuthorizerData {
 private final DefaultRule defaultRule;
 
 /**
- * Contains all of the current ACLs sorted by (resource type, resource 
name).
+ * An immutable array of all the current ACLs sorted by (resource type, 
resource name).
  */
-private final ConcurrentSkipListSet aclsByResource;

Review Comment:
   I see we are replacing the skip-list set with a sorted array. Don't we need 
to guard against duplicates in the array?
   
   If we used a TreeSet here, it would be closer to the current implementation 
and I think it should have linear time when copying from another TreeSet



##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java:
##
@@ -96,18 +97,16 @@ public void completeInitialLoad(Exception e) {
 }
 
 @Override
-public void addAcl(Uuid id, StandardAcl acl) {
-data.addAcl(id, acl);
-}
-
-@Override
-public void removeAcl(Uuid id) {
-data.removeAcl(id);
+public synchronized void loadSnapshot(Map acls) {
+data = data.copyWithAllNewAcls(acls.entrySet());
 }
 
 @Override
-public synchronized void loadSnapshot(Map acls) {
-data = data.copyWithNewAcls(acls.entrySet());
+public synchronized void applyAclChanges(
+Collection> newAcls,

Review Comment:
   Could we just take a `Collection` since the acl has the ID as a 
property?



##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##
@@ -109,14 +110,9 @@ public class StandardAuthorizerData {
 private final DefaultRule defaultRule;
 
 /**
- * Contains all of the current ACLs sorted by (resource type, resource 
name).
+ * An immutable array of all the current ACLs sorted by (resource type, 
resource name).
  */
-private final ConcurrentSkipListSet aclsByResource;
-
-/**
- * Contains all of the current ACLs indexed by UUID.
- */
-private final ConcurrentHashMap aclsById;

Review Comment:
   Guess we don't need this anymore if we are removing `addAcl` and `removeAcl`?



##
metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerData.java:
##
@@ -182,59 +173,58 @@ StandardAuthorizerData copyWithNewConfig(int nodeId,
 loadingComplete,
 newSuperUsers,
 newDefaultResult,
-aclsByResource,
-aclsById);
+acls);
 }
 
-StandardAuthorizerData copyWithNewAcls(Collection> aclEntries) {
-StandardAuthorizerData newData = new StandardAuthorizerData(
-log,
-aclMutator,
-loadingComplete,
-superUsers,
-defaultRule.result,
-new ConcurrentSkipListSet<>(),
-new ConcurrentHashMap<>());
-for (Entry entry : aclEntries) {
-newData.addAcl(entry.getKey(), entry.getValue());
-}
-log.info("Applied {} acl(s) from image.", aclEntries.size());
-return newData;
+StandardAuthorizerData copyWithAllNewAcls(
+Collection> newAclEntries
+) {
+return copyWithNewAcls(EMPTY_ACLS, newAclEntries, 
Collections.emptySet());
 }
 
-void addAcl(Uuid id, StandardAcl acl) {
-try {
-StandardAcl prevAcl = aclsById.putIfAbsent(id, acl);
-if (prevAcl != null) {
-throw new RuntimeException("An ACL with ID " + id + " already 
exists.");
-}
-if (!aclsByResource.add(acl)) {
-aclsById.remove(id);
-throw new RuntimeException("Unable to add the ACL with ID " + 
id +
-" to aclsByResource");
-}
-log.trace("Added ACL {}: {}", id, acl);
-} catch (Throwable e) {
-   

[GitHub] [kafka] jsancio opened a new pull request, #12640: MINOR; Add missing li end tag

2022-09-14 Thread GitBox


jsancio opened a new pull request, #12640:
URL: https://github.com/apache/kafka/pull/12640

   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jsancio merged pull request #12635: MINOR: Mention that kraft is production ready in upgrade notes

2022-09-14 Thread GitBox


jsancio merged PR #12635:
URL: https://github.com/apache/kafka/pull/12635


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation

2022-09-14 Thread GitBox


guozhangwang commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r970973727


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -421,73 +421,95 @@ private void classifyTasksWithoutStateUpdater(final 
Map> 
activeTasksToCreate,
-  final Map> 
standbyTasksToCreate,
-  final Map> 
tasksToRecycle,
-  final Set tasksToCloseClean) {
+private void handleTasksWithStateUpdater(final Map> activeTasksToCreate,
+ final Map> standbyTasksToCreate,
+ final Map> tasksToRecycle,
+ final Set 
tasksToCloseClean) {
+handleRunningAndSuspendedTasks(activeTasksToCreate, 
standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
+handleTasksInStateUpdater(activeTasksToCreate, standbyTasksToCreate);
+}
+
+private void handleRunningAndSuspendedTasks(final Map> activeTasksToCreate,
+final Map> standbyTasksToCreate,
+final Map> tasksToRecycle,
+final Set 
tasksToCloseClean) {
 for (final Task task : tasks.allTasks()) {
+if (!task.isActive()) {
+throw new IllegalStateException("Standby tasks should only be 
managed by the state updater");
+}
 final TaskId taskId = task.id();
 if (activeTasksToCreate.containsKey(taskId)) {
-if (task.isActive()) {
-final Set topicPartitions = 
activeTasksToCreate.get(taskId);
-if (tasks.updateActiveTaskInputPartitions(task, 
topicPartitions)) {
-task.updateInputPartitions(topicPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
-}
-task.resume();
-} else {
-throw new IllegalStateException("Standby tasks should only 
be managed by the state updater");
-}
+handleReAssignedActiveTask(task, 
activeTasksToCreate.get(taskId));
 activeTasksToCreate.remove(taskId);
 } else if (standbyTasksToCreate.containsKey(taskId)) {
-if (!task.isActive()) {
-throw new IllegalStateException("Standby tasks should only 
be managed by the state updater");
-} else {
-tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
-}
+tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
 standbyTasksToCreate.remove(taskId);
 } else {
 tasksToCloseClean.add(task);
 }
 }
 }
 
-private void classifyTasksWithStateUpdater(final Map> activeTasksToCreate,
-   final Map> standbyTasksToCreate,
-   final Map> tasksToRecycle,
-   final Set 
tasksToCloseClean) {
-classifyRunningTasks(activeTasksToCreate, standbyTasksToCreate, 
tasksToRecycle, tasksToCloseClean);
+private void handleReAssignedActiveTask(final Task task,
+final Set 
inputPartitions) {
+if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) {
+task.updateInputPartitions(inputPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
+}
+if (task.state() == State.SUSPENDED) {
+task.resume();
+moveTaskFromTasksRegistryToStateUpdater(task);
+}

Review Comment:
   Yes, for within a single rebalance: with cooperative, the revocation and 
assignment happens at the same time, i.e. at the end of the rebalance, instead 
of revocation happening at the beginning and the assignment happens at the end, 
so for a revoked partition (hence task) we know it's definitely going to be 
reassigned for cooperative.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #12633: [Consumer Refactor] Background thread skeleton

2022-09-14 Thread GitBox


philipnee commented on PR #12633:
URL: https://github.com/apache/kafka/pull/12633#issuecomment-1246909476

   cc @dajac 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14132) Remaining PowerMock to Mockito tests

2022-09-14 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17604794#comment-17604794
 ] 

Chris Egerton commented on KAFKA-14132:
---

[~christo_lolov] Any preference on how we should indicate which tests on this 
list have been migrated? I've just merged 
[https://github.com/apache/kafka/pull/12615|https://github.com/apache/kafka/pull/12615,]
 from [~yash.mayya], which takes care of the {{{}ErrorHandlingTaskTest{}}}, 
{{{}WorkerTaskTest{}}}, {{{}ErrorReporterTest{}}}, 
{{{}RetryWithToleranceOperatorTest{}}}, and {{WorkerErrantRecordReporterTest}} 
test suites.

> Remaining PowerMock to Mockito tests
> 
>
> Key: KAFKA-14132
> URL: https://issues.apache.org/jira/browse/KAFKA-14132
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Christo Lolov
>Assignee: Christo Lolov
>Priority: Major
>
> {color:#de350b}Some of the tests below use EasyMock as well. For those 
> migrate both PowerMock and EasyMock to Mockito.{color}
> Unless stated in brackets the tests are in the connect module.
> A list of tests which still require to be moved from PowerMock to Mockito as 
> of 2nd of August 2022 which do not have a Jira issue and do not have pull 
> requests I am aware of which are opened:
>  # ErrorHandlingTaskTest (owner: Divij)
>  # SourceTaskOffsetCommiterTest (owner: Divij)
>  # WorkerMetricsGroupTest (owner: Divij)
>  # WorkerSinkTaskTest (owner: Divij)
>  # WorkerSinkTaskThreadedTest (owner: Divij)
>  # WorkerTaskTest (owner: [~yash.mayya])
>  # ErrorReporterTest (owner: [~yash.mayya])
>  # RetryWithToleranceOperatorTest (owner: [~yash.mayya])
>  # WorkerErrantRecordReporterTest (owner: [~yash.mayya])
>  # ConnectorsResourceTest
>  # StandaloneHerderTest
>  # KafkaConfigBackingStoreTest
>  # KafkaOffsetBackingStoreTest (owner: Christo) 
> ([https://github.com/apache/kafka/pull/12418])
>  # KafkaBasedLogTest
>  # RetryUtilTest
>  # RepartitionTopicTest (streams) (owner: Christo)
>  # StateManagerUtilTest (streams) (owner: Christo)
> *The coverage report for the above tests after the change should be >= to 
> what the coverage is now.*



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-10802) Spurious log message when starting consumers

2022-09-14 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-10802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17604757#comment-17604757
 ] 

François Rosière edited comment on KAFKA-10802 at 9/14/22 3:02 PM:
---

Log is still present with info level when using Kafka 3.2.1. Possible 
regression? Anything to do to avoid such kind of issues?
{code:java}
[Consumer clientId=consumer-MY_LISTENER_ID-9, groupId=MY_LISTENER_ID] Request 
joining group due to: rebalance failed due to 'The group member needs to have a 
valid member id before actually entering a consumer group.' 
(MemberIdRequiredException){code}


was (Author: JIRAUSER288866):
Issue is still present as an info log when using Kafka 3.2.1. Possible 
regression? Anything to do to avoid such kind of issues?
{code:java}
[Consumer clientId=consumer-MY_LISTENER_ID-9, groupId=MY_LISTENER_ID] Request 
joining group due to: rebalance failed due to 'The group member needs to have a 
valid member id before actually entering a consumer group.' 
(MemberIdRequiredException){code}

> Spurious log message when starting consumers
> 
>
> Key: KAFKA-10802
> URL: https://issues.apache.org/jira/browse/KAFKA-10802
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.6.0
>Reporter: Mickael Maison
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.7.0, 2.6.1, 2.8.0
>
>
> Reported by Gary Russell in the [2.6.1 RC3 vote 
> thread|https://lists.apache.org/thread.html/r13d2c687b2fafbe9907fceb3d4f3cc6d5b34f9f36a7fcc985c38b506%40%3Cdev.kafka.apache.org%3E]
> I am seeing this on every consumer start:
> 2020-11-25 13:54:34.858  INFO 42250 --- [ntainer#0-0-C-1] 
> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer 
> clientId=consumer-ktest26int-1, groupId=ktest26int] Rebalance failed.
> org.apache.kafka.common.errors.MemberIdRequiredException: The group member 
> needs to have a valid member id before actually entering a consumer group.
> Due to this change 
> https://github.com/apache/kafka/commit/16ec1793d53700623c9cb43e711f585aafd44dd4#diff-15efe9b844f78b686393b6c2e2ad61306c3473225742caed05c7edab9a138832R468
> I understand what a MemberIdRequiredException is, but the previous (2.6.0) 
> log (with exception.getMessage()) didn't stand out like the new one does 
> because it was all on one line.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante merged pull request #12615: KAFKA-14132: Migrate some Connect tests from EasyMock/PowerMock to Mockito

2022-09-14 Thread GitBox


C0urante merged PR #12615:
URL: https://github.com/apache/kafka/pull/12615


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-10802) Spurious log message when starting consumers

2022-09-14 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-10802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17604757#comment-17604757
 ] 

François Rosière edited comment on KAFKA-10802 at 9/14/22 2:33 PM:
---

Issue is still present as an info log when using Kafka 3.2.1. Possible 
regression? Anything to do to avoid such kind of issues?
{code:java}
[Consumer clientId=consumer-MY_LISTENER_ID-9, groupId=MY_LISTENER_ID] Request 
joining group due to: rebalance failed due to 'The group member needs to have a 
valid member id before actually entering a consumer group.' 
(MemberIdRequiredException){code}


was (Author: JIRAUSER288866):
Issue is still present when using Kafka 3.2.1. Possible regression? Anything to 
do to avoid such kind of issues?
{code:java}
[Consumer clientId=consumer-MY_LISTENER_ID-9, groupId=MY_LISTENER_ID] Request 
joining group due to: rebalance failed due to 'The group member needs to have a 
valid member id before actually entering a consumer group.' 
(MemberIdRequiredException){code}

> Spurious log message when starting consumers
> 
>
> Key: KAFKA-10802
> URL: https://issues.apache.org/jira/browse/KAFKA-10802
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.6.0
>Reporter: Mickael Maison
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.7.0, 2.6.1, 2.8.0
>
>
> Reported by Gary Russell in the [2.6.1 RC3 vote 
> thread|https://lists.apache.org/thread.html/r13d2c687b2fafbe9907fceb3d4f3cc6d5b34f9f36a7fcc985c38b506%40%3Cdev.kafka.apache.org%3E]
> I am seeing this on every consumer start:
> 2020-11-25 13:54:34.858  INFO 42250 --- [ntainer#0-0-C-1] 
> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer 
> clientId=consumer-ktest26int-1, groupId=ktest26int] Rebalance failed.
> org.apache.kafka.common.errors.MemberIdRequiredException: The group member 
> needs to have a valid member id before actually entering a consumer group.
> Due to this change 
> https://github.com/apache/kafka/commit/16ec1793d53700623c9cb43e711f585aafd44dd4#diff-15efe9b844f78b686393b6c2e2ad61306c3473225742caed05c7edab9a138832R468
> I understand what a MemberIdRequiredException is, but the previous (2.6.0) 
> log (with exception.getMessage()) didn't stand out like the new one does 
> because it was all on one line.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10802) Spurious log message when starting consumers

2022-09-14 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-10802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17604757#comment-17604757
 ] 

François Rosière commented on KAFKA-10802:
--

Issue is still present when using Kafka 3.2.1. Possible regression? Anything to 
do to avoid such kind of issues?
{code:java}
[Consumer clientId=consumer-MY_LISTENER_ID-9, groupId=MY_LISTENER_ID] Request 
joining group due to: rebalance failed due to 'The group member needs to have a 
valid member id before actually entering a consumer group.' 
(MemberIdRequiredException){code}

> Spurious log message when starting consumers
> 
>
> Key: KAFKA-10802
> URL: https://issues.apache.org/jira/browse/KAFKA-10802
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.6.0
>Reporter: Mickael Maison
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.7.0, 2.6.1, 2.8.0
>
>
> Reported by Gary Russell in the [2.6.1 RC3 vote 
> thread|https://lists.apache.org/thread.html/r13d2c687b2fafbe9907fceb3d4f3cc6d5b34f9f36a7fcc985c38b506%40%3Cdev.kafka.apache.org%3E]
> I am seeing this on every consumer start:
> 2020-11-25 13:54:34.858  INFO 42250 --- [ntainer#0-0-C-1] 
> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer 
> clientId=consumer-ktest26int-1, groupId=ktest26int] Rebalance failed.
> org.apache.kafka.common.errors.MemberIdRequiredException: The group member 
> needs to have a valid member id before actually entering a consumer group.
> Due to this change 
> https://github.com/apache/kafka/commit/16ec1793d53700623c9cb43e711f585aafd44dd4#diff-15efe9b844f78b686393b6c2e2ad61306c3473225742caed05c7edab9a138832R468
> I understand what a MemberIdRequiredException is, but the previous (2.6.0) 
> log (with exception.getMessage()) didn't stand out like the new one does 
> because it was all on one line.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] C0urante commented on pull request #11792: Replace EasyMock/PowerMock with Mockito in DistributedHerderTest

2022-09-14 Thread GitBox


C0urante commented on PR #11792:
URL: https://github.com/apache/kafka/pull/11792#issuecomment-1246858578

   Hi @dplavcic, are you still working on this issue?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.

2022-09-14 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17604716#comment-17604716
 ] 

Chris Egerton commented on KAFKA-14220:
---

[~kumarpritam863] this seemed like a legitimate issue; is there a reason it's 
been closed and the description has been cleared?

> "partition-count" not getting updated after revocation of partitions in case 
> of Incremental Co-operative rebalancing.
> -
>
> Key: KAFKA-14220
> URL: https://issues.apache.org/jira/browse/KAFKA-14220
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.1
>Reporter: Pritam Kumar
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-14 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r970800187


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamSelfJoin implements ProcessorSupplier {
+private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
+
+private final String windowName;
+private final long joinThisBeforeMs;
+private final long joinThisAfterMs;
+private final long joinOtherBeforeMs;
+private final long joinOtherAfterMs;
+private final ValueJoinerWithKey joinerThis;
+
+private final TimeTracker sharedTimeTracker;
+
+KStreamKStreamSelfJoin(
+final String windowName,
+final JoinWindowsInternal windows,
+final ValueJoinerWithKey joinerThis,
+final TimeTracker sharedTimeTracker) {
+
+this.windowName = windowName;
+this.joinThisBeforeMs = windows.beforeMs;
+this.joinThisAfterMs = windows.afterMs;
+this.joinOtherBeforeMs = windows.afterMs;
+this.joinOtherAfterMs = windows.beforeMs;
+this.joinerThis = joinerThis;
+this.sharedTimeTracker = sharedTimeTracker;
+}
+
+@Override
+public Processor get() {
+return new KStreamKStreamSelfJoinProcessor();
+}
+
+private class KStreamKStreamSelfJoinProcessor extends 
ContextualProcessor {
+private WindowStore windowStore;
+private Sensor droppedRecordsSensor;
+
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+
+final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
+droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
+windowStore = context.getStateStore(windowName);
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void process(final Record record) {
+if (StreamStreamJoinUtil.skipRecord(record, LOG, 
droppedRecordsSensor, context())) {
+return;
+}
+
+final long inputRecordTimestamp = record.timestamp();
+long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinThisBeforeMs);

Review Comment:
   They are not the same: the first uses the `joinThisBeforeMs` and the second 
uses the `joinOtherBeforeMs`. This is needed as the inner join uses different 
intervals when fetching rows from the window store based on whether it is the 
left or right-hand side. Since we want the self-join to match the output of the 
inner-join, I followed the same logic.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-14 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r970805661


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamSelfJoin implements ProcessorSupplier {
+private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
+
+private final String windowName;
+private final long joinThisBeforeMs;
+private final long joinThisAfterMs;
+private final long joinOtherBeforeMs;
+private final long joinOtherAfterMs;
+private final ValueJoinerWithKey joinerThis;
+
+private final TimeTracker sharedTimeTracker;
+
+KStreamKStreamSelfJoin(
+final String windowName,
+final JoinWindowsInternal windows,
+final ValueJoinerWithKey joinerThis,
+final TimeTracker sharedTimeTracker) {
+
+this.windowName = windowName;
+this.joinThisBeforeMs = windows.beforeMs;
+this.joinThisAfterMs = windows.afterMs;
+this.joinOtherBeforeMs = windows.afterMs;
+this.joinOtherAfterMs = windows.beforeMs;
+this.joinerThis = joinerThis;
+this.sharedTimeTracker = sharedTimeTracker;
+}
+
+@Override
+public Processor get() {
+return new KStreamKStreamSelfJoinProcessor();
+}
+
+private class KStreamKStreamSelfJoinProcessor extends 
ContextualProcessor {
+private WindowStore windowStore;
+private Sensor droppedRecordsSensor;
+
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+
+final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
+droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
+windowStore = context.getStateStore(windowName);
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void process(final Record record) {
+if (StreamStreamJoinUtil.skipRecord(record, LOG, 
droppedRecordsSensor, context())) {
+return;
+}
+
+final long inputRecordTimestamp = record.timestamp();
+long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinThisBeforeMs);
+long timeTo = Math.max(0L, inputRecordTimestamp + joinThisAfterMs);
+boolean emittedJoinWithSelf = false;
+final Record selfRecord = record
+.withValue(joinerThis.apply(record.key(), record.value(), (V2) 
record.value()))
+.withTimestamp(inputRecordTimestamp);
+sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
+
+// Join current record with other
+try (final WindowStoreIterator iter = windowStore.fetch(
+record.key(), timeFrom, timeTo)) {
+while (iter.hasNext()) {
+final KeyValue otherRecord = iter.next();
+final long otherRecordTimestamp = otherRecord.key;
+
+// Join this with other
+context().forward(
+

[GitHub] [kafka] showuon commented on pull request #12639: KAFKA-14233: do not init managers twice to avoid resource leak

2022-09-14 Thread GitBox


showuon commented on PR #12639:
URL: https://github.com/apache/kafka/pull/12639#issuecomment-1246762358

   @hachikuji @jsancio , please take a look. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon opened a new pull request, #12639: KAFKA-14233: do not init managers twice to avoid resource leak

2022-09-14 Thread GitBox


showuon opened a new pull request, #12639:
URL: https://github.com/apache/kafka/pull/12639

   Recently, we got a lot of build failed (and terminated) with core:unitTest 
failure. The failed messages look like this:
   ```
   FAILURE: Build failed with an exception.
   [2022-09-14T09:51:52.190Z] 
   [2022-09-14T09:51:52.190Z] * What went wrong:
   [2022-09-14T09:51:52.190Z] Execution failed for task ':core:unitTest'.
   [2022-09-14T09:51:52.190Z] > Process 'Gradle Test Executor 128' finished 
with non-zero exit value 1
   ```
   
   After investigation, I found one reason of it (maybe there are other 
reasons). In  
`BrokerMetadataPublisherTest#testReloadUpdatedFilesWithoutConfigChange` test, 
we created logManager twice, but when cleanup, we only close one of them. So, 
there will be a log cleaner keeping running. But during this time, the temp log 
dirs are deleted, so it will `Exit.halt(1)`, and got the error we saw in 
gradle, like this code did when we encounter IOException in all our log dirs:
   
   ```
   fatal(s"Shutdown broker because all log dirs in ${logDirs.mkString(", ")} 
have failed")
   Exit.halt(1)
   ```
   
   Fixed it by disable `_firstPublish` flag for mock publisher, to avoid 
resource leak.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-14 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r970800187


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamSelfJoin implements ProcessorSupplier {
+private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
+
+private final String windowName;
+private final long joinThisBeforeMs;
+private final long joinThisAfterMs;
+private final long joinOtherBeforeMs;
+private final long joinOtherAfterMs;
+private final ValueJoinerWithKey joinerThis;
+
+private final TimeTracker sharedTimeTracker;
+
+KStreamKStreamSelfJoin(
+final String windowName,
+final JoinWindowsInternal windows,
+final ValueJoinerWithKey joinerThis,
+final TimeTracker sharedTimeTracker) {
+
+this.windowName = windowName;
+this.joinThisBeforeMs = windows.beforeMs;
+this.joinThisAfterMs = windows.afterMs;
+this.joinOtherBeforeMs = windows.afterMs;
+this.joinOtherAfterMs = windows.beforeMs;
+this.joinerThis = joinerThis;
+this.sharedTimeTracker = sharedTimeTracker;
+}
+
+@Override
+public Processor get() {
+return new KStreamKStreamSelfJoinProcessor();
+}
+
+private class KStreamKStreamSelfJoinProcessor extends 
ContextualProcessor {
+private WindowStore windowStore;
+private Sensor droppedRecordsSensor;
+
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+
+final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
+droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
+windowStore = context.getStateStore(windowName);
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void process(final Record record) {
+if (StreamStreamJoinUtil.skipRecord(record, LOG, 
droppedRecordsSensor, context())) {
+return;
+}
+
+final long inputRecordTimestamp = record.timestamp();
+long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinThisBeforeMs);

Review Comment:
   They are not the same: the first uses the `joinThisBeforeMs` and the second 
uses the `joinOtherBeforeMs`. This is needed as the inner join (for a reason I 
have not understood cc @vvcephei ) uses different intervals when fetching rows 
from the window store based on whether it is the left or right-hand side. Since 
we want the self-join to match the output of the inner-join, I followed the 
same logic.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vpapavas commented on a diff in pull request #12555: Optimize self-join

2022-09-14 Thread GitBox


vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r970792491


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamSelfJoin.java:
##
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
+import org.apache.kafka.streams.kstream.internals.KStreamImplJoin.TimeTracker;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class KStreamKStreamSelfJoin implements ProcessorSupplier {
+private static final Logger LOG = 
LoggerFactory.getLogger(KStreamKStreamSelfJoin.class);
+
+private final String windowName;
+private final long joinThisBeforeMs;
+private final long joinThisAfterMs;
+private final long joinOtherBeforeMs;
+private final long joinOtherAfterMs;
+private final ValueJoinerWithKey joinerThis;
+
+private final TimeTracker sharedTimeTracker;
+
+KStreamKStreamSelfJoin(
+final String windowName,
+final JoinWindowsInternal windows,
+final ValueJoinerWithKey joinerThis,
+final TimeTracker sharedTimeTracker) {
+
+this.windowName = windowName;
+this.joinThisBeforeMs = windows.beforeMs;
+this.joinThisAfterMs = windows.afterMs;
+this.joinOtherBeforeMs = windows.afterMs;
+this.joinOtherAfterMs = windows.beforeMs;
+this.joinerThis = joinerThis;
+this.sharedTimeTracker = sharedTimeTracker;
+}
+
+@Override
+public Processor get() {
+return new KStreamKStreamSelfJoinProcessor();
+}
+
+private class KStreamKStreamSelfJoinProcessor extends 
ContextualProcessor {
+private WindowStore windowStore;
+private Sensor droppedRecordsSensor;
+
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+
+final StreamsMetricsImpl metrics = (StreamsMetricsImpl) 
context.metrics();
+droppedRecordsSensor = 
droppedRecordsSensor(Thread.currentThread().getName(), 
context.taskId().toString(), metrics);
+windowStore = context.getStateStore(windowName);
+}
+
+@SuppressWarnings("unchecked")
+@Override
+public void process(final Record record) {
+if (StreamStreamJoinUtil.skipRecord(record, LOG, 
droppedRecordsSensor, context())) {
+return;
+}
+
+final long inputRecordTimestamp = record.timestamp();
+long timeFrom = Math.max(0L, inputRecordTimestamp - 
joinThisBeforeMs);
+long timeTo = Math.max(0L, inputRecordTimestamp + joinThisAfterMs);
+boolean emittedJoinWithSelf = false;
+final Record selfRecord = record
+.withValue(joinerThis.apply(record.key(), record.value(), (V2) 
record.value()))
+.withTimestamp(inputRecordTimestamp);
+sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
+
+// Join current record with other
+try (final WindowStoreIterator iter = windowStore.fetch(
+record.key(), timeFrom, timeTo)) {
+while (iter.hasNext()) {
+final KeyValue otherRecord = iter.next();
+final long otherRecordTimestamp = otherRecord.key;
+
+// Join this with other
+context().forward(
+

[jira] [Updated] (KAFKA-14233) Jenkins build failed with task ':core:unitTest' exit value 1

2022-09-14 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14233:
--
Description: 
Recently, we got a lot of build failed (and terminated) with core:unitTest 
failure. The failed messages look like this:
{code:java}
FAILURE: Build failed with an exception.
[2022-09-14T09:51:52.190Z] 
[2022-09-14T09:51:52.190Z] * What went wrong:
[2022-09-14T09:51:52.190Z] Execution failed for task ':core:unitTest'.
[2022-09-14T09:51:52.190Z] > Process 'Gradle Test Executor 128' finished with 
non-zero exit value 1
[2022-09-14T09:51:52.190Z]   This problem might be caused by incorrect test 
process configuration.
[2022-09-14T09:51:52.190Z]   Please refer to the test execution section in the 
User Manual at 
https://docs.gradle.org/7.5.1/userguide/java_testing.html#sec:test_execution
[2022-09-14T09:51:52.190Z] 
[2022-09-14T09:51:52.190Z] * Try:
[2022-09-14T09:51:52.190Z] > Run with --stacktrace option to get the stack 
trace.
[2022-09-14T09:51:52.190Z] > Run with --info or --debug option to get more log 
output.
[2022-09-14T09:51:52.190Z] > Run with --scan to get full insights.
[2022-09-14T09:51:52.190Z] 
[2022-09-14T09:51:52.190Z] * Get more help at https://help.gradle.org
[2022-09-14T09:51:52.190Z] 
[2022-09-14T09:51:52.190Z] BUILD FAILED in 2h 36m 27s{code}

  was:
Recently, we got a lot of build failed (and terminated) with Failed messages 
look like this:
{code:java}
FAILURE: Build failed with an exception.
[2022-09-14T09:51:52.190Z] 
[2022-09-14T09:51:52.190Z] * What went wrong:
[2022-09-14T09:51:52.190Z] Execution failed for task ':core:unitTest'.
[2022-09-14T09:51:52.190Z] > Process 'Gradle Test Executor 128' finished with 
non-zero exit value 1
[2022-09-14T09:51:52.190Z]   This problem might be caused by incorrect test 
process configuration.
[2022-09-14T09:51:52.190Z]   Please refer to the test execution section in the 
User Manual at 
https://docs.gradle.org/7.5.1/userguide/java_testing.html#sec:test_execution
[2022-09-14T09:51:52.190Z] 
[2022-09-14T09:51:52.190Z] * Try:
[2022-09-14T09:51:52.190Z] > Run with --stacktrace option to get the stack 
trace.
[2022-09-14T09:51:52.190Z] > Run with --info or --debug option to get more log 
output.
[2022-09-14T09:51:52.190Z] > Run with --scan to get full insights.
[2022-09-14T09:51:52.190Z] 
[2022-09-14T09:51:52.190Z] * Get more help at https://help.gradle.org
[2022-09-14T09:51:52.190Z] 
[2022-09-14T09:51:52.190Z] BUILD FAILED in 2h 36m 27s{code}


> Jenkins build failed with task ':core:unitTest' exit value 1
> 
>
> Key: KAFKA-14233
> URL: https://issues.apache.org/jira/browse/KAFKA-14233
> Project: Kafka
>  Issue Type: Test
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Recently, we got a lot of build failed (and terminated) with core:unitTest 
> failure. The failed messages look like this:
> {code:java}
> FAILURE: Build failed with an exception.
> [2022-09-14T09:51:52.190Z] 
> [2022-09-14T09:51:52.190Z] * What went wrong:
> [2022-09-14T09:51:52.190Z] Execution failed for task ':core:unitTest'.
> [2022-09-14T09:51:52.190Z] > Process 'Gradle Test Executor 128' finished with 
> non-zero exit value 1
> [2022-09-14T09:51:52.190Z]   This problem might be caused by incorrect test 
> process configuration.
> [2022-09-14T09:51:52.190Z]   Please refer to the test execution section in 
> the User Manual at 
> https://docs.gradle.org/7.5.1/userguide/java_testing.html#sec:test_execution
> [2022-09-14T09:51:52.190Z] 
> [2022-09-14T09:51:52.190Z] * Try:
> [2022-09-14T09:51:52.190Z] > Run with --stacktrace option to get the stack 
> trace.
> [2022-09-14T09:51:52.190Z] > Run with --info or --debug option to get more 
> log output.
> [2022-09-14T09:51:52.190Z] > Run with --scan to get full insights.
> [2022-09-14T09:51:52.190Z] 
> [2022-09-14T09:51:52.190Z] * Get more help at https://help.gradle.org
> [2022-09-14T09:51:52.190Z] 
> [2022-09-14T09:51:52.190Z] BUILD FAILED in 2h 36m 27s{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14233) Jenkins build failed with task ':core:unitTest' exit value 1

2022-09-14 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14233:
--
Description: 
Recently, we got a lot of build failed (and terminated) with Failed messages 
look like this:
{code:java}
FAILURE: Build failed with an exception.
[2022-09-14T09:51:52.190Z] 
[2022-09-14T09:51:52.190Z] * What went wrong:
[2022-09-14T09:51:52.190Z] Execution failed for task ':core:unitTest'.
[2022-09-14T09:51:52.190Z] > Process 'Gradle Test Executor 128' finished with 
non-zero exit value 1
[2022-09-14T09:51:52.190Z]   This problem might be caused by incorrect test 
process configuration.
[2022-09-14T09:51:52.190Z]   Please refer to the test execution section in the 
User Manual at 
https://docs.gradle.org/7.5.1/userguide/java_testing.html#sec:test_execution
[2022-09-14T09:51:52.190Z] 
[2022-09-14T09:51:52.190Z] * Try:
[2022-09-14T09:51:52.190Z] > Run with --stacktrace option to get the stack 
trace.
[2022-09-14T09:51:52.190Z] > Run with --info or --debug option to get more log 
output.
[2022-09-14T09:51:52.190Z] > Run with --scan to get full insights.
[2022-09-14T09:51:52.190Z] 
[2022-09-14T09:51:52.190Z] * Get more help at https://help.gradle.org
[2022-09-14T09:51:52.190Z] 
[2022-09-14T09:51:52.190Z] BUILD FAILED in 2h 36m 27s{code}

  was:
Failed messages look like this:
{code:java}
FAILURE: Build failed with an exception.
[2022-09-14T09:51:52.190Z] 
[2022-09-14T09:51:52.190Z] * What went wrong:
[2022-09-14T09:51:52.190Z] Execution failed for task ':core:unitTest'.
[2022-09-14T09:51:52.190Z] > Process 'Gradle Test Executor 128' finished with 
non-zero exit value 1
[2022-09-14T09:51:52.190Z]   This problem might be caused by incorrect test 
process configuration.
[2022-09-14T09:51:52.190Z]   Please refer to the test execution section in the 
User Manual at 
https://docs.gradle.org/7.5.1/userguide/java_testing.html#sec:test_execution
[2022-09-14T09:51:52.190Z] 
[2022-09-14T09:51:52.190Z] * Try:
[2022-09-14T09:51:52.190Z] > Run with --stacktrace option to get the stack 
trace.
[2022-09-14T09:51:52.190Z] > Run with --info or --debug option to get more log 
output.
[2022-09-14T09:51:52.190Z] > Run with --scan to get full insights.
[2022-09-14T09:51:52.190Z] 
[2022-09-14T09:51:52.190Z] * Get more help at https://help.gradle.org
[2022-09-14T09:51:52.190Z] 
[2022-09-14T09:51:52.190Z] BUILD FAILED in 2h 36m 27s{code}


> Jenkins build failed with task ':core:unitTest' exit value 1
> 
>
> Key: KAFKA-14233
> URL: https://issues.apache.org/jira/browse/KAFKA-14233
> Project: Kafka
>  Issue Type: Test
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> Recently, we got a lot of build failed (and terminated) with Failed messages 
> look like this:
> {code:java}
> FAILURE: Build failed with an exception.
> [2022-09-14T09:51:52.190Z] 
> [2022-09-14T09:51:52.190Z] * What went wrong:
> [2022-09-14T09:51:52.190Z] Execution failed for task ':core:unitTest'.
> [2022-09-14T09:51:52.190Z] > Process 'Gradle Test Executor 128' finished with 
> non-zero exit value 1
> [2022-09-14T09:51:52.190Z]   This problem might be caused by incorrect test 
> process configuration.
> [2022-09-14T09:51:52.190Z]   Please refer to the test execution section in 
> the User Manual at 
> https://docs.gradle.org/7.5.1/userguide/java_testing.html#sec:test_execution
> [2022-09-14T09:51:52.190Z] 
> [2022-09-14T09:51:52.190Z] * Try:
> [2022-09-14T09:51:52.190Z] > Run with --stacktrace option to get the stack 
> trace.
> [2022-09-14T09:51:52.190Z] > Run with --info or --debug option to get more 
> log output.
> [2022-09-14T09:51:52.190Z] > Run with --scan to get full insights.
> [2022-09-14T09:51:52.190Z] 
> [2022-09-14T09:51:52.190Z] * Get more help at https://help.gradle.org
> [2022-09-14T09:51:52.190Z] 
> [2022-09-14T09:51:52.190Z] BUILD FAILED in 2h 36m 27s{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14233) Jenkins build failed with task ':core:unitTest' exit value 1

2022-09-14 Thread Luke Chen (Jira)
Luke Chen created KAFKA-14233:
-

 Summary: Jenkins build failed with task ':core:unitTest' exit 
value 1
 Key: KAFKA-14233
 URL: https://issues.apache.org/jira/browse/KAFKA-14233
 Project: Kafka
  Issue Type: Test
Reporter: Luke Chen
Assignee: Luke Chen


Failed messages look like this:
{code:java}
FAILURE: Build failed with an exception.
[2022-09-14T09:51:52.190Z] 
[2022-09-14T09:51:52.190Z] * What went wrong:
[2022-09-14T09:51:52.190Z] Execution failed for task ':core:unitTest'.
[2022-09-14T09:51:52.190Z] > Process 'Gradle Test Executor 128' finished with 
non-zero exit value 1
[2022-09-14T09:51:52.190Z]   This problem might be caused by incorrect test 
process configuration.
[2022-09-14T09:51:52.190Z]   Please refer to the test execution section in the 
User Manual at 
https://docs.gradle.org/7.5.1/userguide/java_testing.html#sec:test_execution
[2022-09-14T09:51:52.190Z] 
[2022-09-14T09:51:52.190Z] * Try:
[2022-09-14T09:51:52.190Z] > Run with --stacktrace option to get the stack 
trace.
[2022-09-14T09:51:52.190Z] > Run with --info or --debug option to get more log 
output.
[2022-09-14T09:51:52.190Z] > Run with --scan to get full insights.
[2022-09-14T09:51:52.190Z] 
[2022-09-14T09:51:52.190Z] * Get more help at https://help.gradle.org
[2022-09-14T09:51:52.190Z] 
[2022-09-14T09:51:52.190Z] BUILD FAILED in 2h 36m 27s{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-7109) KafkaConsumer should close its incremental fetch sessions on close

2022-09-14 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-7109:
-
Fix Version/s: 3.3.1

> KafkaConsumer should close its incremental fetch sessions on close
> --
>
> Key: KAFKA-7109
> URL: https://issues.apache.org/jira/browse/KAFKA-7109
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Assignee: Divij Vaidya
>Priority: Minor
>  Labels: new-consumer-threading-should-fix
> Fix For: 3.4.0, 3.3.1
>
>
> KafkaConsumer should close its incremental fetch sessions on close.  
> Currently, the sessions are not closed, but simply time out once the consumer 
> is gone.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon merged pull request #11516: MINOR: Use MessageDigest equals when comparing signature

2022-09-14 Thread GitBox


showuon merged PR #11516:
URL: https://github.com/apache/kafka/pull/11516


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #11516: MINOR: Use MessageDigest equals when comparing signature

2022-09-14 Thread GitBox


showuon commented on PR #11516:
URL: https://github.com/apache/kafka/pull/11516#issuecomment-1246713341

   Failed tests are unrelated:
   ```
   Build / JDK 8 and Scala 2.12 / 
integration.kafka.server.FetchRequestBetweenDifferentIbpTest.testControllerNewToOldIBP()
   Build / JDK 17 and Scala 2.13 / 
kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, 
Name=testAllTopicPartition, Security=PLAINTEXT
   Build / JDK 17 and Scala 2.13 / 
kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testPathToJsonFile, 
Security=PLAINTEXT
   Build / JDK 17 and Scala 2.13 / 
kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testTopicPartition, 
Security=PLAINTEXT
   Build / JDK 17 and Scala 2.13 / 
kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, 
Name=testAllTopicPartition, Security=PLAINTEXT
   Build / JDK 17 and Scala 2.13 / 
kafka.admin.LeaderElectionCommandTest.[1] Type=Raft, Name=testTopicPartition, 
Security=PLAINTEXT
   Build / JDK 17 and Scala 2.13 / kafka.server.ReplicaManagerTest.[1] 
usesTopicIds=true
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation

2022-09-14 Thread GitBox


cadonna commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r970668525


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -421,73 +421,95 @@ private void classifyTasksWithoutStateUpdater(final 
Map> 
activeTasksToCreate,
-  final Map> 
standbyTasksToCreate,
-  final Map> 
tasksToRecycle,
-  final Set tasksToCloseClean) {
+private void handleTasksWithStateUpdater(final Map> activeTasksToCreate,
+ final Map> standbyTasksToCreate,
+ final Map> tasksToRecycle,
+ final Set 
tasksToCloseClean) {
+handleRunningAndSuspendedTasks(activeTasksToCreate, 
standbyTasksToCreate, tasksToRecycle, tasksToCloseClean);
+handleTasksInStateUpdater(activeTasksToCreate, standbyTasksToCreate);
+}
+
+private void handleRunningAndSuspendedTasks(final Map> activeTasksToCreate,
+final Map> standbyTasksToCreate,
+final Map> tasksToRecycle,
+final Set 
tasksToCloseClean) {
 for (final Task task : tasks.allTasks()) {
+if (!task.isActive()) {
+throw new IllegalStateException("Standby tasks should only be 
managed by the state updater");
+}
 final TaskId taskId = task.id();
 if (activeTasksToCreate.containsKey(taskId)) {
-if (task.isActive()) {
-final Set topicPartitions = 
activeTasksToCreate.get(taskId);
-if (tasks.updateActiveTaskInputPartitions(task, 
topicPartitions)) {
-task.updateInputPartitions(topicPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
-}
-task.resume();
-} else {
-throw new IllegalStateException("Standby tasks should only 
be managed by the state updater");
-}
+handleReAssignedActiveTask(task, 
activeTasksToCreate.get(taskId));
 activeTasksToCreate.remove(taskId);
 } else if (standbyTasksToCreate.containsKey(taskId)) {
-if (!task.isActive()) {
-throw new IllegalStateException("Standby tasks should only 
be managed by the state updater");
-} else {
-tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
-}
+tasksToRecycle.put(task, standbyTasksToCreate.get(taskId));
 standbyTasksToCreate.remove(taskId);
 } else {
 tasksToCloseClean.add(task);
 }
 }
 }
 
-private void classifyTasksWithStateUpdater(final Map> activeTasksToCreate,
-   final Map> standbyTasksToCreate,
-   final Map> tasksToRecycle,
-   final Set 
tasksToCloseClean) {
-classifyRunningTasks(activeTasksToCreate, standbyTasksToCreate, 
tasksToRecycle, tasksToCloseClean);
+private void handleReAssignedActiveTask(final Task task,
+final Set 
inputPartitions) {
+if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) {
+task.updateInputPartitions(inputPartitions, 
topologyMetadata.nodeToSourceTopics(task.id()));
+}
+if (task.state() == State.SUSPENDED) {
+task.resume();
+moveTaskFromTasksRegistryToStateUpdater(task);
+}

Review Comment:
   Could you elaborate why a suspended task cannot be reassigned as active with 
the cooperative assignor? Is this guaranteed?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-10635) Streams application fails with OutOfOrderSequenceException after rolling restarts of brokers

2022-09-14 Thread Nicholas Telford (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17604021#comment-17604021
 ] 

Nicholas Telford commented on KAFKA-10635:
--

[~guozhang] both my Kafka brokers and all clients run 3.2.0.

The original issue reporter is running  2.5.1

> Streams application fails with OutOfOrderSequenceException after rolling 
> restarts of brokers
> 
>
> Key: KAFKA-10635
> URL: https://issues.apache.org/jira/browse/KAFKA-10635
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 2.5.1
>Reporter: Peeraya Maetasatidsuk
>Priority: Blocker
>
> We are upgrading our brokers to version 2.5.1 (from 2.3.1) by performing a 
> rolling restart of the brokers after installing the new version. After the 
> restarts we notice one of our streams app (client version 2.4.1) fails with 
> OutOfOrderSequenceException:
>  
> {code:java}
> ERROR [2020-10-13 22:52:21,400] [com.aaa.bbb.ExceptionHandler] Unexpected 
> error. Record: a_record, destination topic: 
> topic-name-Aggregation-repartition 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> ERROR [2020-10-13 22:52:21,413] 
> [org.apache.kafka.streams.processor.internals.AssignedTasks] stream-thread 
> [topic-name-StreamThread-1] Failed to commit stream task 1_39 due to the 
> following error: org.apache.kafka.streams.errors.StreamsException: task 
> [1_39] Abort sending since an error caught with a previous record (timestamp 
> 1602654659000) to topic topic-name-Aggregation-repartition due to 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:144)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:52)
> at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:204)
> at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1348)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:230)
> at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:196)
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:730) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:716) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:674)
> at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:596)
> at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
>    at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:798)
> at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)   
>  at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
> at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)at 
> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335)   
>  at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)   
>  at java.base/java.lang.Thread.run(Thread.java:834)Caused by: 
> org.apache.kafka.common.errors.OutOfOrderSequenceException: The broker 
> received an out of order sequence number.
> {code}
> We see a corresponding error on the broker side:
> {code:java}
> [2020-10-13 22:52:21,398] ERROR [ReplicaManager broker=137636348] Error 
> processing append operation on partition 
> topic-name-Aggregation-repartition-52  
> (kafka.server.ReplicaManager)org.apache.kafka.common.errors.OutOfOrderSequenceException:
>  Out of order sequence number for producerId 2819098 at offset 1156041 in 
> partition topic-name-Aggregation-repartition-52: 29 (incoming seq. number), 
> -1 (current end sequence number)
> {code}
> We are able to reproduce this many times and it happens regardless of whether 
> the broker shutdown (at restart) is clean or unclean. However, when we 
> rollback the broker version to 2.3.1 from 2.5.1 and perform similar rolling 
> restarts, we don't see this error on the streams application at all. This is 
> blocking us from upgrading our broker version. 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cadonna commented on a diff in pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation

2022-09-14 Thread GitBox


cadonna commented on code in PR #12600:
URL: https://github.com/apache/kafka/pull/12600#discussion_r970637424


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##
@@ -103,42 +101,75 @@ public void addPendingStandbyTasksToCreate(final 
Map
 
 @Override
 public Set removePendingTaskToRecycle(final TaskId taskId) 
{
-return pendingTasksToRecycle.remove(taskId);
+if (containsTaskIdWithAction(taskId, Action.RECYCLE)) {
+return pendingUpdateActions.remove(taskId).getInputPartitions();
+}
+return null;
 }
 
 @Override
 public void addPendingTaskToRecycle(final TaskId taskId, final 
Set inputPartitions) {
-pendingTasksToRecycle.put(taskId, inputPartitions);
+pendingUpdateActions.put(taskId, 
PendingUpdateAction.createRecycleTask(inputPartitions));
 }
 
 @Override
 public Set removePendingTaskToUpdateInputPartitions(final 
TaskId taskId) {
-return pendingTasksToUpdateInputPartitions.remove(taskId);
+if (containsTaskIdWithAction(taskId, Action.UPDATE_INPUT_PARTITIONS)) {
+return pendingUpdateActions.remove(taskId).getInputPartitions();
+}
+return null;
 }
 
 @Override
 public void addPendingTaskToUpdateInputPartitions(final TaskId taskId, 
final Set inputPartitions) {
-pendingTasksToUpdateInputPartitions.put(taskId, inputPartitions);
+pendingUpdateActions.put(taskId, 
PendingUpdateAction.createUpdateInputPartition(inputPartitions));
 }
 
 @Override
 public boolean removePendingTaskToCloseDirty(final TaskId taskId) {
-return pendingTasksToCloseDirty.remove(taskId);
+if (containsTaskIdWithAction(taskId, Action.CLOSE_DIRTY)) {
+pendingUpdateActions.remove(taskId);
+return true;
+}
+return false;
 }
 
 @Override
 public void addPendingTaskToCloseDirty(final TaskId taskId) {
-pendingTasksToCloseDirty.add(taskId);
+pendingUpdateActions.put(taskId, 
PendingUpdateAction.createCloseDirty());
 }
 
 @Override
 public boolean removePendingTaskToCloseClean(final TaskId taskId) {
-return pendingTasksToCloseClean.remove(taskId);
+if (containsTaskIdWithAction(taskId, Action.CLOSE_CLEAN)) {
+pendingUpdateActions.remove(taskId);
+return true;
+}
+return false;
 }
 
 @Override
 public void addPendingTaskToCloseClean(final TaskId taskId) {
-pendingTasksToCloseClean.add(taskId);
+pendingUpdateActions.put(taskId, 
PendingUpdateAction.createCloseClean());
+}
+
+@Override
+public boolean removePendingActiveTaskToSuspend(final TaskId taskId) {
+if (containsTaskIdWithAction(taskId, Action.SUSPEND)) {
+pendingUpdateActions.remove(taskId);
+return true;
+}
+return false;
+}
+
+@Override
+public void addPendingActiveTaskToSuspend(final TaskId taskId) {
+pendingUpdateActions.put(taskId, PendingUpdateAction.createSuspend());
+}
+
+private boolean containsTaskIdWithAction(final TaskId taskId, final Action 
action) {
+final PendingUpdateAction pendingUpdateAction = 
pendingUpdateActions.get(taskId);
+return !(pendingUpdateAction == null || 
pendingUpdateAction.getAction() != action);

Review Comment:
   Done in https://github.com/apache/kafka/pull/12638



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12638: Register and unregister changelog topics in state updater

2022-09-14 Thread GitBox


cadonna commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r970637017


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##
@@ -169,7 +169,7 @@ public void addPendingActiveTaskToSuspend(final TaskId 
taskId) {
 
 private boolean containsTaskIdWithAction(final TaskId taskId, final Action 
action) {
 final PendingUpdateAction pendingUpdateAction = 
pendingUpdateActions.get(taskId);
-return !(pendingUpdateAction == null || 
pendingUpdateAction.getAction() != action);
+return pendingUpdateAction != null && pendingUpdateAction.getAction() 
== action;

Review Comment:
   Follow up from 
https://github.com/apache/kafka/pull/12600#discussion_r970078065



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12638: Register and unregister changelog topics in state updater

2022-09-14 Thread GitBox


cadonna commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r970631567


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogRegister.java:
##
@@ -31,6 +33,8 @@ public interface ChangelogRegister {
  */
 void register(final TopicPartition partition, final ProcessorStateManager 
stateManager);
 
+void register(final Set partition, final 
ProcessorStateManager stateManager);

Review Comment:
   This is not strictly needed, but I thought it makes registering and 
unregistering a bit more symmetric.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12638: Register and unregister changelog topics in state updater

2022-09-14 Thread GitBox


cadonna commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r970630607


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##
@@ -536,7 +543,9 @@ public void flushCache() {
 public void close() throws ProcessorStateException {
 log.debug("Closing its state manager and all the registered state 
stores: {}", stores);
 
-changelogReader.unregister(getAllChangelogTopicPartitions());
+if (!stateUpdaterEnabled) {
+changelogReader.unregister(getAllChangelogTopicPartitions());
+}

Review Comment:
   Only unregister changelogs if the state updater is disabled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12638: Register and unregister changelog topics in state updater

2022-09-14 Thread GitBox


cadonna commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r970630206


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##
@@ -209,7 +212,9 @@ void registerStateStores(final List allStores, 
final InternalProcess
 processorContext.uninitialize();
 for (final StateStore store : allStores) {
 if (stores.containsKey(store.name())) {
-maybeRegisterStoreWithChangelogReader(store.name());
+if (!stateUpdaterEnabled) {
+maybeRegisterStoreWithChangelogReader(store.name());
+}

Review Comment:
   Only register changelogs if the state updater is disabled.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##
@@ -352,7 +357,9 @@ public void registerStore(final StateStore store,
 // on the state manager this state store would be closed as well
 stores.put(storeName, storeMetadata);
 
-maybeRegisterStoreWithChangelogReader(storeName);
+if (!stateUpdaterEnabled) {
+maybeRegisterStoreWithChangelogReader(storeName);
+}

Review Comment:
   Only register changelogs if the state updater is disabled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12638: Register and unregister changelog topics in state updater

2022-09-14 Thread GitBox


cadonna commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r970629341


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java:
##
@@ -575,8 +584,10 @@ public void close() throws ProcessorStateException {
 void recycle() {
 log.debug("Recycling state for {} task {}.", taskType, taskId);
 
-final List allChangelogs = 
getAllChangelogTopicPartitions();
-changelogReader.unregister(allChangelogs);
+if (!stateUpdaterEnabled) {
+final List allChangelogs = 
getAllChangelogTopicPartitions();
+changelogReader.unregister(allChangelogs);
+}

Review Comment:
   Note that once we have only the state updater recycling a state manager 
becomes a noop.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12638: Register and unregister changelog topics in state updater

2022-09-14 Thread GitBox


cadonna commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r970627977


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##
@@ -406,17 +412,21 @@ private void maybeCheckpointUpdatingTasks(final long now) 
{
 private StateUpdaterThread stateUpdaterThread = null;
 private CountDownLatch shutdownGate;
 
-public DefaultStateUpdater(final StreamsConfig config,
+private String name;
+
+public DefaultStateUpdater(final String name,

Review Comment:
   Added a name for the state updater to improve log messages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12638: Register and unregister changelog topics in state updater

2022-09-14 Thread GitBox


cadonna commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r970626576


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##
@@ -262,19 +263,19 @@ private List getTasksAndActions() {
 private void addTask(final Task task) {
 if (isStateless(task)) {
 addToRestoredTasks((StreamTask) task);
-log.debug("Stateless active task " + task.id() + " was added 
to the restored tasks of the state updater");
+log.info("Stateless active task " + task.id() + " was added to 
the restored tasks of the state updater");

Review Comment:
   I changed a couple of log messages from debug to info to better track tasks. 
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #12638: Register and unregister changelog topics in state updater

2022-09-14 Thread GitBox


cadonna commented on code in PR #12638:
URL: https://github.com/apache/kafka/pull/12638#discussion_r970625278


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##
@@ -98,7 +99,7 @@ public void run() {
 while (isRunning.get()) {
 try {
 runOnce();
-} catch (final InterruptedException interruptedException) {
+} catch (final InterruptedException | InterruptException 
interruptedException) {

Review Comment:
   The restore consumer might throw a `InterruptException` when the state 
updater is shutdown.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12638: Register and unregister changelog topics in state updater

2022-09-14 Thread GitBox


cadonna commented on PR #12638:
URL: https://github.com/apache/kafka/pull/12638#issuecomment-1246558345

   Call for review: @wcarlson5 @lihaosky 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna opened a new pull request, #12638: Register and unregister changelog topics in state updater

2022-09-14 Thread GitBox


cadonna opened a new pull request, #12638:
URL: https://github.com/apache/kafka/pull/12638

   Registering and unregistering the changelog topics in the changelog reader 
outside of the state updater leads to race conditions between the stream thread 
and the state updater thread. Thus, this PR moves registering and unregistering 
of changelog topics in the changelog reader into the state updater if the state 
updater is enabled.
   
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] naanagon commented on pull request #11516: MINOR: Use MessageDigest equals when comparing signature

2022-09-14 Thread GitBox


naanagon commented on PR #11516:
URL: https://github.com/apache/kafka/pull/11516#issuecomment-1246533634

   @divijvaidya Thank you for your suggestion. Noted.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.

2022-09-14 Thread Pritam Kumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pritam Kumar updated KAFKA-14220:
-
Description: (was: Issue:

In case of the revocation of partitions, the updation of "partition count" 
metrics is being done before updating the new set of assignments. 
"invokePartitionsRevoked" method of "onJoinComplete" function of 
"ConsumerCoordinator" class is being called before the "

subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As a 
result of which the old assigned partition count is getting updated again and 
again even after future rebalances.

Demo:

Suppose the current assignment is like:

Assigned partitions: [partition-1, partition-2]
Current owned partitions: []
Added partitions (assigned - owned): [partition-1, partition-2]
Revoked partitions (owned - assigned): []

After that some other worker joined and part of that, as a result of which 
“partition-2” has to be revoked.

Assigned partitions: [partition-1]
Current owned partitions: [partition-1, partition-2]
Added partitions (assigned - owned): []
Revoked partitions (owned - assigned): [partition-2]

But as the "assignment" need to be updated with these new assignment via the 
following logic:
[kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463]

Line 463 in 
[3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
||subscriptions.assignFromSubscribed(assignedPartitions);|

 

But before this only "{*}updatePartitionCount{*}()" is getting called via 
"{*}invokePartitionsRevoked{*}":

 
[kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443]

Line 443 in 
[3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
||firstException.compareAndSet(null, 
invokePartitionsRevoked(revokedPartitions));|

 

Due to this when it is going to call for the "{*}assignedPartitions{*}" of 
"{*}consumer{*}" via the following:
[kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892]

Line 892 in 
[3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
||public Set assignment() {|

 

the "{*}assignedPartitions{*}" is not yet updated.

Solution:
As part of the bug fix to KAFKA-12622 introducing code changes to 
update the partition count metrics after the the newly assigned partition are 
registered.)

> "partition-count" not getting updated after revocation of partitions in case 
> of Incremental Co-operative rebalancing.
> -
>
> Key: KAFKA-14220
> URL: https://issues.apache.org/jira/browse/KAFKA-14220
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.1
>Reporter: Pritam Kumar
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.

2022-09-14 Thread Pritam Kumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pritam Kumar resolved KAFKA-14220.
--
  Reviewer:   (was: Chris Egerton)
Resolution: Abandoned

> "partition-count" not getting updated after revocation of partitions in case 
> of Incremental Co-operative rebalancing.
> -
>
> Key: KAFKA-14220
> URL: https://issues.apache.org/jira/browse/KAFKA-14220
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.1
>Reporter: Pritam Kumar
>Priority: Major
>
> Issue:
> 
In case of the revocation of partitions, the updation of "partition count" 
> metrics is being done before updating the new set of assignments. 
> "invokePartitionsRevoked" method of "onJoinComplete" function of 
> "ConsumerCoordinator" class is being called before the "

> subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As 
> a result of which the old assigned partition count is getting updated again 
> and again even after future rebalances.
> Demo:
> Suppose the current assignment is like:
> Assigned partitions: [partition-1, partition-2]
> Current owned partitions: []
> Added partitions (assigned - owned): [partition-1, partition-2]
> Revoked partitions (owned - assigned): []
> After that some other worker joined and part of that, as a result of which 
> “partition-2” has to be revoked.
> Assigned partitions: [partition-1]
> Current owned partitions: [partition-1, partition-2]
> Added partitions (assigned - owned): []
> Revoked partitions (owned - assigned): [partition-2]
> But as the "assignment" need to be updated with these new assignment via the 
> following logic:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463]
> Line 463 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||subscriptions.assignFromSubscribed(assignedPartitions);|
>  
> But before this only "{*}updatePartitionCount{*}()" is getting called via 
> "{*}invokePartitionsRevoked{*}":
>  
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443]
> Line 443 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||firstException.compareAndSet(null, 
> invokePartitionsRevoked(revokedPartitions));|
>  
> Due to this when it is going to call for the "{*}assignedPartitions{*}" of 
> "{*}consumer{*}" via the following:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892]
> Line 892 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||public Set assignment() {|
>  
> the "{*}assignedPartitions{*}" is not yet updated.
> Solution:
As part of the bug fix to KAFKA-12622 introducing code changes to 
> update the partition count metrics after the the newly assigned partition are 
> registered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.

2022-09-14 Thread Pritam Kumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pritam Kumar closed KAFKA-14220.


> "partition-count" not getting updated after revocation of partitions in case 
> of Incremental Co-operative rebalancing.
> -
>
> Key: KAFKA-14220
> URL: https://issues.apache.org/jira/browse/KAFKA-14220
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.1
>Reporter: Pritam Kumar
>Priority: Major
>
> Issue:
> 
In case of the revocation of partitions, the updation of "partition count" 
> metrics is being done before updating the new set of assignments. 
> "invokePartitionsRevoked" method of "onJoinComplete" function of 
> "ConsumerCoordinator" class is being called before the "

> subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As 
> a result of which the old assigned partition count is getting updated again 
> and again even after future rebalances.
> Demo:
> Suppose the current assignment is like:
> Assigned partitions: [partition-1, partition-2]
> Current owned partitions: []
> Added partitions (assigned - owned): [partition-1, partition-2]
> Revoked partitions (owned - assigned): []
> After that some other worker joined and part of that, as a result of which 
> “partition-2” has to be revoked.
> Assigned partitions: [partition-1]
> Current owned partitions: [partition-1, partition-2]
> Added partitions (assigned - owned): []
> Revoked partitions (owned - assigned): [partition-2]
> But as the "assignment" need to be updated with these new assignment via the 
> following logic:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463]
> Line 463 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||subscriptions.assignFromSubscribed(assignedPartitions);|
>  
> But before this only "{*}updatePartitionCount{*}()" is getting called via 
> "{*}invokePartitionsRevoked{*}":
>  
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443]
> Line 443 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||firstException.compareAndSet(null, 
> invokePartitionsRevoked(revokedPartitions));|
>  
> Due to this when it is going to call for the "{*}assignedPartitions{*}" of 
> "{*}consumer{*}" via the following:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892]
> Line 892 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||public Set assignment() {|
>  
> the "{*}assignedPartitions{*}" is not yet updated.
> Solution:
As part of the bug fix to KAFKA-12622 introducing code changes to 
> update the partition count metrics after the the newly assigned partition are 
> registered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.

2022-09-14 Thread Pritam Kumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pritam Kumar updated KAFKA-14220:
-
Flags:   (was: Patch,Important)

> "partition-count" not getting updated after revocation of partitions in case 
> of Incremental Co-operative rebalancing.
> -
>
> Key: KAFKA-14220
> URL: https://issues.apache.org/jira/browse/KAFKA-14220
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.1
>Reporter: Pritam Kumar
>Priority: Major
>
> Issue:
> 
In case of the revocation of partitions, the updation of "partition count" 
> metrics is being done before updating the new set of assignments. 
> "invokePartitionsRevoked" method of "onJoinComplete" function of 
> "ConsumerCoordinator" class is being called before the "

> subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As 
> a result of which the old assigned partition count is getting updated again 
> and again even after future rebalances.
> Demo:
> Suppose the current assignment is like:
> Assigned partitions: [partition-1, partition-2]
> Current owned partitions: []
> Added partitions (assigned - owned): [partition-1, partition-2]
> Revoked partitions (owned - assigned): []
> After that some other worker joined and part of that, as a result of which 
> “partition-2” has to be revoked.
> Assigned partitions: [partition-1]
> Current owned partitions: [partition-1, partition-2]
> Added partitions (assigned - owned): []
> Revoked partitions (owned - assigned): [partition-2]
> But as the "assignment" need to be updated with these new assignment via the 
> following logic:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463]
> Line 463 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||subscriptions.assignFromSubscribed(assignedPartitions);|
>  
> But before this only "{*}updatePartitionCount{*}()" is getting called via 
> "{*}invokePartitionsRevoked{*}":
>  
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443]
> Line 443 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||firstException.compareAndSet(null, 
> invokePartitionsRevoked(revokedPartitions));|
>  
> Due to this when it is going to call for the "{*}assignedPartitions{*}" of 
> "{*}consumer{*}" via the following:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892]
> Line 892 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||public Set assignment() {|
>  
> the "{*}assignedPartitions{*}" is not yet updated.
> Solution:
As part of the bug fix to KAFKA-12622 introducing code changes to 
> update the partition count metrics after the the newly assigned partition are 
> registered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.

2022-09-14 Thread Pritam Kumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pritam Kumar updated KAFKA-14220:
-
Labels:   (was: pull-request-available)

> "partition-count" not getting updated after revocation of partitions in case 
> of Incremental Co-operative rebalancing.
> -
>
> Key: KAFKA-14220
> URL: https://issues.apache.org/jira/browse/KAFKA-14220
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.1
>Reporter: Pritam Kumar
>Priority: Major
>
> Issue:
> 
In case of the revocation of partitions, the updation of "partition count" 
> metrics is being done before updating the new set of assignments. 
> "invokePartitionsRevoked" method of "onJoinComplete" function of 
> "ConsumerCoordinator" class is being called before the "

> subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As 
> a result of which the old assigned partition count is getting updated again 
> and again even after future rebalances.
> Demo:
> Suppose the current assignment is like:
> Assigned partitions: [partition-1, partition-2]
> Current owned partitions: []
> Added partitions (assigned - owned): [partition-1, partition-2]
> Revoked partitions (owned - assigned): []
> After that some other worker joined and part of that, as a result of which 
> “partition-2” has to be revoked.
> Assigned partitions: [partition-1]
> Current owned partitions: [partition-1, partition-2]
> Added partitions (assigned - owned): []
> Revoked partitions (owned - assigned): [partition-2]
> But as the "assignment" need to be updated with these new assignment via the 
> following logic:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463]
> Line 463 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||subscriptions.assignFromSubscribed(assignedPartitions);|
>  
> But before this only "{*}updatePartitionCount{*}()" is getting called via 
> "{*}invokePartitionsRevoked{*}":
>  
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443]
> Line 443 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||firstException.compareAndSet(null, 
> invokePartitionsRevoked(revokedPartitions));|
>  
> Due to this when it is going to call for the "{*}assignedPartitions{*}" of 
> "{*}consumer{*}" via the following:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892]
> Line 892 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||public Set assignment() {|
>  
> the "{*}assignedPartitions{*}" is not yet updated.
> Solution:
As part of the bug fix to KAFKA-12622 introducing code changes to 
> update the partition count metrics after the the newly assigned partition are 
> registered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.

2022-09-14 Thread Pritam Kumar (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-14220 ]


Pritam Kumar deleted comment on KAFKA-14220:
--

was (Author: JIRAUSER295638):
The following is the patch made and verified:
*https://github.com/apache/kafka/pull/12622*

> "partition-count" not getting updated after revocation of partitions in case 
> of Incremental Co-operative rebalancing.
> -
>
> Key: KAFKA-14220
> URL: https://issues.apache.org/jira/browse/KAFKA-14220
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.1
>Reporter: Pritam Kumar
>Priority: Major
>  Labels: pull-request-available
>
> Issue:
> 
In case of the revocation of partitions, the updation of "partition count" 
> metrics is being done before updating the new set of assignments. 
> "invokePartitionsRevoked" method of "onJoinComplete" function of 
> "ConsumerCoordinator" class is being called before the "

> subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As 
> a result of which the old assigned partition count is getting updated again 
> and again even after future rebalances.
> Demo:
> Suppose the current assignment is like:
> Assigned partitions: [partition-1, partition-2]
> Current owned partitions: []
> Added partitions (assigned - owned): [partition-1, partition-2]
> Revoked partitions (owned - assigned): []
> After that some other worker joined and part of that, as a result of which 
> “partition-2” has to be revoked.
> Assigned partitions: [partition-1]
> Current owned partitions: [partition-1, partition-2]
> Added partitions (assigned - owned): []
> Revoked partitions (owned - assigned): [partition-2]
> But as the "assignment" need to be updated with these new assignment via the 
> following logic:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463]
> Line 463 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||subscriptions.assignFromSubscribed(assignedPartitions);|
>  
> But before this only "{*}updatePartitionCount{*}()" is getting called via 
> "{*}invokePartitionsRevoked{*}":
>  
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443]
> Line 443 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||firstException.compareAndSet(null, 
> invokePartitionsRevoked(revokedPartitions));|
>  
> Due to this when it is going to call for the "{*}assignedPartitions{*}" of 
> "{*}consumer{*}" via the following:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892]
> Line 892 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||public Set assignment() {|
>  
> the "{*}assignedPartitions{*}" is not yet updated.
> Solution:
As part of the bug fix to KAFKA-12622 introducing code changes to 
> update the partition count metrics after the the newly assigned partition are 
> registered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.

2022-09-14 Thread Pritam Kumar (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-14220 ]


Pritam Kumar deleted comment on KAFKA-14220:
--

was (Author: JIRAUSER295638):
https://github.com/apache/kafka/pull/12622

> "partition-count" not getting updated after revocation of partitions in case 
> of Incremental Co-operative rebalancing.
> -
>
> Key: KAFKA-14220
> URL: https://issues.apache.org/jira/browse/KAFKA-14220
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.0.1
>Reporter: Pritam Kumar
>Priority: Major
>  Labels: pull-request-available
>
> Issue:
> 
In case of the revocation of partitions, the updation of "partition count" 
> metrics is being done before updating the new set of assignments. 
> "invokePartitionsRevoked" method of "onJoinComplete" function of 
> "ConsumerCoordinator" class is being called before the "

> subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As 
> a result of which the old assigned partition count is getting updated again 
> and again even after future rebalances.
> Demo:
> Suppose the current assignment is like:
> Assigned partitions: [partition-1, partition-2]
> Current owned partitions: []
> Added partitions (assigned - owned): [partition-1, partition-2]
> Revoked partitions (owned - assigned): []
> After that some other worker joined and part of that, as a result of which 
> “partition-2” has to be revoked.
> Assigned partitions: [partition-1]
> Current owned partitions: [partition-1, partition-2]
> Added partitions (assigned - owned): []
> Revoked partitions (owned - assigned): [partition-2]
> But as the "assignment" need to be updated with these new assignment via the 
> following logic:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463]
> Line 463 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||subscriptions.assignFromSubscribed(assignedPartitions);|
>  
> But before this only "{*}updatePartitionCount{*}()" is getting called via 
> "{*}invokePartitionsRevoked{*}":
>  
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443]
> Line 443 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||firstException.compareAndSet(null, 
> invokePartitionsRevoked(revokedPartitions));|
>  
> Due to this when it is going to call for the "{*}assignedPartitions{*}" of 
> "{*}consumer{*}" via the following:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892]
> Line 892 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||public Set assignment() {|
>  
> the "{*}assignedPartitions{*}" is not yet updated.
> Solution:
As part of the bug fix to KAFKA-12622 introducing code changes to 
> update the partition count metrics after the the newly assigned partition are 
> registered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14220) "partition-count" not getting updated after revocation of partitions in case of Incremental Co-operative rebalancing.

2022-09-14 Thread Pritam Kumar (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pritam Kumar updated KAFKA-14220:
-
Component/s: (was: KafkaConnect)

> "partition-count" not getting updated after revocation of partitions in case 
> of Incremental Co-operative rebalancing.
> -
>
> Key: KAFKA-14220
> URL: https://issues.apache.org/jira/browse/KAFKA-14220
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.0.1
>Reporter: Pritam Kumar
>Priority: Major
>
> Issue:
> 
In case of the revocation of partitions, the updation of "partition count" 
> metrics is being done before updating the new set of assignments. 
> "invokePartitionsRevoked" method of "onJoinComplete" function of 
> "ConsumerCoordinator" class is being called before the "

> subscriptions.assignFromSubscribed(assignedPartitions)" of the same class. As 
> a result of which the old assigned partition count is getting updated again 
> and again even after future rebalances.
> Demo:
> Suppose the current assignment is like:
> Assigned partitions: [partition-1, partition-2]
> Current owned partitions: []
> Added partitions (assigned - owned): [partition-1, partition-2]
> Revoked partitions (owned - assigned): []
> After that some other worker joined and part of that, as a result of which 
> “partition-2” has to be revoked.
> Assigned partitions: [partition-1]
> Current owned partitions: [partition-1, partition-2]
> Added partitions (assigned - owned): []
> Revoked partitions (owned - assigned): [partition-2]
> But as the "assignment" need to be updated with these new assignment via the 
> following logic:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L463]
> Line 463 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||subscriptions.assignFromSubscribed(assignedPartitions);|
>  
> But before this only "{*}updatePartitionCount{*}()" is getting called via 
> "{*}invokePartitionsRevoked{*}":
>  
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L443]
> Line 443 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||firstException.compareAndSet(null, 
> invokePartitionsRevoked(revokedPartitions));|
>  
> Due to this when it is going to call for the "{*}assignedPartitions{*}" of 
> "{*}consumer{*}" via the following:
> [kafka/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java|https://github.com/apache/kafka/blob/3d2ac7cdbe89cdabfd95db9971de31878afa5498/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L892]
> Line 892 in 
> [3d2ac7c|https://github.com/apache/kafka/commit/3d2ac7cdbe89cdabfd95db9971de31878afa5498]
> ||public Set assignment() {|
>  
> the "{*}assignedPartitions{*}" is not yet updated.
> Solution:
As part of the bug fix to KAFKA-12622 introducing code changes to 
> update the partition count metrics after the the newly assigned partition are 
> registered.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] kumarpritam863 closed pull request #12622: Update WorkerSinkTask.java

2022-09-14 Thread GitBox


kumarpritam863 closed pull request #12622: Update WorkerSinkTask.java
URL: https://github.com/apache/kafka/pull/12622


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov commented on pull request #12632: KAFKA-12878 Support --bootstrap-server in kafka-streams-application-reset tool

2022-09-14 Thread GitBox


nizhikov commented on PR #12632:
URL: https://github.com/apache/kafka/pull/12632#issuecomment-1246427376

   @C0urante Thanks for the review. Fixed all your comments.
   
   > Should we also update the system test
   
   I doubt it.
   
   a. We have compatibility tests (which not use streams resetter at the 
moment). see - 
[streams_broker_compatibility_test.py](https://github.com/apache/kafka/blob/trunk/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py#L82).
   So it's possible then in future we will try to use 
`kafka-streams-application-reset.sh` for previously released Kafka versions 
which knows nothing about new parameter.
   
   b. At the moment I don't have test environment to check changes in system 
tests :).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov commented on a diff in pull request #12632: KAFKA-12878 Support --bootstrap-server in kafka-streams-application-reset tool

2022-09-14 Thread GitBox


nizhikov commented on code in PR #12632:
URL: https://github.com/apache/kafka/pull/12632#discussion_r970460345


##
docs/streams/developer-guide/app-reset-tool.html:
##
@@ -84,9 +84,11 @@ Step 1: Run the application reset 

[GitHub] [kafka] nizhikov commented on a diff in pull request #12632: KAFKA-12878 Support --bootstrap-server in kafka-streams-application-reset tool

2022-09-14 Thread GitBox


nizhikov commented on code in PR #12632:
URL: https://github.com/apache/kafka/pull/12632#discussion_r970457373


##
core/src/main/scala/kafka/tools/StreamsResetter.java:
##
@@ -213,8 +217,12 @@ private void parseArguments(final String[] args) {
 .ofType(String.class)
 .describedAs("id")
 .required();
-bootstrapServerOption = optionParser.accepts("bootstrap-servers", 
"Comma-separated list of broker urls with format: HOST1:PORT1,HOST2:PORT2")
-.withRequiredArg()
+bootstrapServerOption = optionParser.accepts("bootstrap-server", "The 
server(s) to use for bootstrapping.")

Review Comment:
   Thanks for a hint. Apply code you suggested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna merged pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation

2022-09-14 Thread GitBox


cadonna merged PR #12600:
URL: https://github.com/apache/kafka/pull/12600


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12600: KAFKA-10199: Suspend tasks in the state updater on revocation

2022-09-14 Thread GitBox


cadonna commented on PR #12600:
URL: https://github.com/apache/kafka/pull/12600#issuecomment-1246332926

   Build failures are unrelated:
   ```
   Build / JDK 8 and Scala 2.12 / kafka.test.ClusterTestExtensionsTest.[1] 
Type=ZK, Name=Generated Test, MetadataVersion=3.3-IV3, Security=PLAINTEXT
   Build / JDK 17 and Scala 2.13 / 
org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-8115) Flaky Test CoordinatorTest#testTaskRequestWithOldStartMsGetsUpdated

2022-09-14 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17603914#comment-17603914
 ] 

Bruno Cadonna edited comment on KAFKA-8115 at 9/14/22 7:01 AM:
---

Failed again with timeout on JDK17: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12600/7/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/Build___JDK_17_and_Scala_2_13___testTaskRequestWithOldStartMsGetsUpdated___2/


was (Author: cadonna):
Failed again on JDK17: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12600/7/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/Build___JDK_17_and_Scala_2_13___testTaskRequestWithOldStartMsGetsUpdated___2/

> Flaky Test CoordinatorTest#testTaskRequestWithOldStartMsGetsUpdated
> ---
>
> Key: KAFKA-8115
> URL: https://issues.apache.org/jira/browse/KAFKA-8115
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3254/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/testTaskRequestWithOldStartMsGetsUpdated/]
> {quote}org.junit.runners.model.TestTimedOutException: test timed out after 
> 12 milliseconds at java.base@11.0.1/jdk.internal.misc.Unsafe.park(Native 
> Method) at 
> java.base@11.0.1/java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
>  at 
> java.base@11.0.1/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
>  at 
> java.base@11.0.1/java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1454)
>  at 
> java.base@11.0.1/java.util.concurrent.Executors$DelegatedExecutorService.awaitTermination(Executors.java:709)
>  at 
> app//org.apache.kafka.trogdor.rest.JsonRestServer.waitForShutdown(JsonRestServer.java:157)
>  at app//org.apache.kafka.trogdor.agent.Agent.waitForShutdown(Agent.java:123) 
> at 
> app//org.apache.kafka.trogdor.common.MiniTrogdorCluster.close(MiniTrogdorCluster.java:285)
>  at 
> app//org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated(CoordinatorTest.java:596)
>  at 
> java.base@11.0.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base@11.0.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base@11.0.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base@11.0.1/java.lang.reflect.Method.invoke(Method.java:566) at 
> app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
>  at 
> app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
>  at java.base@11.0.1/java.util.concurrent.FutureTask.run(FutureTask.java:264) 
> at java.base@11.0.1/java.lang.Thread.run(Thread.java:834){quote}
> STDOUT
> {quote}[2019-03-15 09:23:41,364] INFO Creating MiniTrogdorCluster with 
> agents: node02 and coordinator: node01 
> (org.apache.kafka.trogdor.common.MiniTrogdorCluster:135) [2019-03-15 
> 09:23:41,595] INFO Logging initialized @13340ms to 
> org.eclipse.jetty.util.log.Slf4jLog (org.eclipse.jetty.util.log:193) 
> [2019-03-15 09:23:41,752] INFO Starting REST server 
> (org.apache.kafka.trogdor.rest.JsonRestServer:89) [2019-03-15 09:23:41,912] 
> INFO Registered resource 
> org.apache.kafka.trogdor.agent.AgentRestResource@3fa38ceb 
> (org.apache.kafka.trogdor.rest.JsonRestServer:94) [2019-03-15 09:23:42,178] 
> INFO jetty-9.4.14.v20181114; built: 2018-11-14T21:20:31.478Z; git: 
> c4550056e785fb5665914545889f21dc136ad9e6; jvm 11.0.1+13-LTS 
> (org.eclipse.jetty.server.Server:370) [2019-03-15 09:23:42,360] INFO 
> DefaultSessionIdManager workerName=node0 
> (org.eclipse.jetty.server.session:365) [2019-03-15 09:23:42,362] INFO No 
> SessionScavenger set, using defaults (org.eclipse.jetty.server.session:370) 
> [2019-03-15 09:23:42,370] INFO node0 Scavenging every 66ms 
> (org.eclipse.jetty.server.session:149) [2019-03-15 09:23:44,412] INFO Started 
> o.e.j.s.ServletContextHandler@335a5293\{/,null,AVAILABLE} 
> (org.eclipse.jetty.server.handler.ContextHandler:855) [2019-03-15 
> 09:23:44,473] 

[jira] [Commented] (KAFKA-8115) Flaky Test CoordinatorTest#testTaskRequestWithOldStartMsGetsUpdated

2022-09-14 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17603914#comment-17603914
 ] 

Bruno Cadonna commented on KAFKA-8115:
--

Failed again on JDK17: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12600/7/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/Build___JDK_17_and_Scala_2_13___testTaskRequestWithOldStartMsGetsUpdated___2/

> Flaky Test CoordinatorTest#testTaskRequestWithOldStartMsGetsUpdated
> ---
>
> Key: KAFKA-8115
> URL: https://issues.apache.org/jira/browse/KAFKA-8115
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3254/testReport/junit/org.apache.kafka.trogdor.coordinator/CoordinatorTest/testTaskRequestWithOldStartMsGetsUpdated/]
> {quote}org.junit.runners.model.TestTimedOutException: test timed out after 
> 12 milliseconds at java.base@11.0.1/jdk.internal.misc.Unsafe.park(Native 
> Method) at 
> java.base@11.0.1/java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:234)
>  at 
> java.base@11.0.1/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2123)
>  at 
> java.base@11.0.1/java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1454)
>  at 
> java.base@11.0.1/java.util.concurrent.Executors$DelegatedExecutorService.awaitTermination(Executors.java:709)
>  at 
> app//org.apache.kafka.trogdor.rest.JsonRestServer.waitForShutdown(JsonRestServer.java:157)
>  at app//org.apache.kafka.trogdor.agent.Agent.waitForShutdown(Agent.java:123) 
> at 
> app//org.apache.kafka.trogdor.common.MiniTrogdorCluster.close(MiniTrogdorCluster.java:285)
>  at 
> app//org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated(CoordinatorTest.java:596)
>  at 
> java.base@11.0.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) at 
> java.base@11.0.1/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> java.base@11.0.1/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base@11.0.1/java.lang.reflect.Method.invoke(Method.java:566) at 
> app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
>  at 
> app//org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
>  at java.base@11.0.1/java.util.concurrent.FutureTask.run(FutureTask.java:264) 
> at java.base@11.0.1/java.lang.Thread.run(Thread.java:834){quote}
> STDOUT
> {quote}[2019-03-15 09:23:41,364] INFO Creating MiniTrogdorCluster with 
> agents: node02 and coordinator: node01 
> (org.apache.kafka.trogdor.common.MiniTrogdorCluster:135) [2019-03-15 
> 09:23:41,595] INFO Logging initialized @13340ms to 
> org.eclipse.jetty.util.log.Slf4jLog (org.eclipse.jetty.util.log:193) 
> [2019-03-15 09:23:41,752] INFO Starting REST server 
> (org.apache.kafka.trogdor.rest.JsonRestServer:89) [2019-03-15 09:23:41,912] 
> INFO Registered resource 
> org.apache.kafka.trogdor.agent.AgentRestResource@3fa38ceb 
> (org.apache.kafka.trogdor.rest.JsonRestServer:94) [2019-03-15 09:23:42,178] 
> INFO jetty-9.4.14.v20181114; built: 2018-11-14T21:20:31.478Z; git: 
> c4550056e785fb5665914545889f21dc136ad9e6; jvm 11.0.1+13-LTS 
> (org.eclipse.jetty.server.Server:370) [2019-03-15 09:23:42,360] INFO 
> DefaultSessionIdManager workerName=node0 
> (org.eclipse.jetty.server.session:365) [2019-03-15 09:23:42,362] INFO No 
> SessionScavenger set, using defaults (org.eclipse.jetty.server.session:370) 
> [2019-03-15 09:23:42,370] INFO node0 Scavenging every 66ms 
> (org.eclipse.jetty.server.session:149) [2019-03-15 09:23:44,412] INFO Started 
> o.e.j.s.ServletContextHandler@335a5293\{/,null,AVAILABLE} 
> (org.eclipse.jetty.server.handler.ContextHandler:855) [2019-03-15 
> 09:23:44,473] INFO Started 
> ServerConnector@79a93bf1\{HTTP/1.1,[http/1.1]}{0.0.0.0:33477} 
> (org.eclipse.jetty.server.AbstractConnector:292) [2019-03-15 09:23:44,474] 
> INFO Started @16219ms (org.eclipse.jetty.server.Server:407) [2019-03-15 
> 09:23:44,475] INFO REST server listening at [http://127.0.1.1:33477/] 
> 

  1   2   >