[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

2023-01-31 Thread via GitHub


beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1092652543


##
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##
@@ -414,34 +414,51 @@ private  T getConfiguredInstance(Object klass, 
Class t, Map T getConfiguredInstance(String key, Class t) {
-return getConfiguredInstance(key, t, Collections.emptyMap());
+T configuredInstance = null;
+
+try {
+configuredInstance = getConfiguredInstance(key, t, 
Collections.emptyMap());
+} catch (Exception e) {
+maybeClose(configuredInstance, "AutoCloseable object constructed 
and configured during failed call to getConfiguredInstance");
+throw e;
+}
+
+return configuredInstance;
 }
 
 /**
  * Get a configured instance of the give class specified by the given 
configuration key. If the object implements
  * Configurable configure it using the configuration.
  *
- * @param key The configuration key for the class
- * @param t The interface the class should implement
+ * @param key The configuration key for the class
+ * @param t   The interface the class should implement
  * @param configOverrides override origin configs
  * @return A configured instance of the class
  */
 public  T getConfiguredInstance(String key, Class t, Map configOverrides) {
 Class c = getClass(key);
+T configuredInstance = null;
 
-return getConfiguredInstance(c, t, originals(configOverrides));
+try {
+configuredInstance = getConfiguredInstance(c, t, 
originals(configOverrides));
+} catch (Exception e) {

Review Comment:
   I went ahead and extended the try/catch to enclose the  `o = 
Utils.newInstance((String) klass, t);` as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

2023-01-31 Thread via GitHub


beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1092652543


##
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##
@@ -414,34 +414,51 @@ private  T getConfiguredInstance(Object klass, 
Class t, Map T getConfiguredInstance(String key, Class t) {
-return getConfiguredInstance(key, t, Collections.emptyMap());
+T configuredInstance = null;
+
+try {
+configuredInstance = getConfiguredInstance(key, t, 
Collections.emptyMap());
+} catch (Exception e) {
+maybeClose(configuredInstance, "AutoCloseable object constructed 
and configured during failed call to getConfiguredInstance");
+throw e;
+}
+
+return configuredInstance;
 }
 
 /**
  * Get a configured instance of the give class specified by the given 
configuration key. If the object implements
  * Configurable configure it using the configuration.
  *
- * @param key The configuration key for the class
- * @param t The interface the class should implement
+ * @param key The configuration key for the class
+ * @param t   The interface the class should implement
  * @param configOverrides override origin configs
  * @return A configured instance of the class
  */
 public  T getConfiguredInstance(String key, Class t, Map configOverrides) {
 Class c = getClass(key);
+T configuredInstance = null;
 
-return getConfiguredInstance(c, t, originals(configOverrides));
+try {
+configuredInstance = getConfiguredInstance(c, t, 
originals(configOverrides));
+} catch (Exception e) {

Review Comment:
   I've updated accordingly.  However, this assumes no resource leakage within 
the constructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13178: KAFKA-12468, KAFKA-14663, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-01-31 Thread via GitHub


gharris1727 commented on PR #13178:
URL: https://github.com/apache/kafka/pull/13178#issuecomment-1411301405

   After #13181 is merged, I'll rebase and remove my fairness patch.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13181: KAFKA-14610: Publish Mirror Maker 2 offset syncs in task commit() method

2023-01-31 Thread via GitHub


gharris1727 commented on code in PR #13181:
URL: https://github.com/apache/kafka/pull/13181#discussion_r1092643889


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##
@@ -87,6 +90,7 @@ public MirrorSourceTask() {}
 @Override
 public void start(Map props) {
 MirrorSourceTaskConfig config = new MirrorSourceTaskConfig(props);
+pendingOffsetSyncs.clear();

Review Comment:
   nit: findbugs doesn't complain about this not being synchronized? you must 
be on better terms with it than I am.



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java:
##
@@ -194,41 +200,63 @@ public void commitRecord(SourceRecord record, 
RecordMetadata metadata) {
 TopicPartition sourceTopicPartition = 
MirrorUtils.unwrapPartition(record.sourcePartition());
 long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
 long downstreamOffset = metadata.offset();
-maybeSyncOffsets(sourceTopicPartition, upstreamOffset, 
downstreamOffset);
+maybeQueueOffsetSyncs(sourceTopicPartition, upstreamOffset, 
downstreamOffset);
+// We may be able to immediately publish an offset sync that we've 
queued up here
+firePendingOffsetSyncs();
 }
 
-// updates partition state and sends OffsetSync if necessary
-private void maybeSyncOffsets(TopicPartition topicPartition, long 
upstreamOffset,
-long downstreamOffset) {
+// updates partition state and queues up OffsetSync if necessary
+private void maybeQueueOffsetSyncs(TopicPartition topicPartition, long 
upstreamOffset,
+   long downstreamOffset) {
 PartitionState partitionState =
 partitionStates.computeIfAbsent(topicPartition, x -> new 
PartitionState(maxOffsetLag));
 if (partitionState.update(upstreamOffset, downstreamOffset)) {
-if (sendOffsetSync(topicPartition, upstreamOffset, 
downstreamOffset)) {
-partitionState.reset();
+OffsetSync offsetSync = new OffsetSync(topicPartition, 
upstreamOffset, downstreamOffset);
+synchronized (this) {
+pendingOffsetSyncs.put(topicPartition, offsetSync);
 }
+partitionState.reset();
 }
 }
 
-// sends OffsetSync record upstream to internal offsets topic
-private boolean sendOffsetSync(TopicPartition topicPartition, long 
upstreamOffset,
-long downstreamOffset) {
-if (!outstandingOffsetSyncs.tryAcquire()) {
-// Too many outstanding offset syncs.
-return false;
+private void firePendingOffsetSyncs() {
+while (true) {
+OffsetSync pendingOffsetSync;
+synchronized (this) {
+Iterator syncIterator = 
pendingOffsetSyncs.values().iterator();
+if (!syncIterator.hasNext()) {
+// Nothing to sync
+log.debug("No more pending offset syncs");

Review Comment:
   nit: these log statements could be spammy if they're called on every 
commitRecord, wdyt about `trace`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

2023-01-31 Thread via GitHub


C0urante commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1092632585


##
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##
@@ -414,34 +414,51 @@ private  T getConfiguredInstance(Object klass, 
Class t, Map T getConfiguredInstance(String key, Class t) {
-return getConfiguredInstance(key, t, Collections.emptyMap());
+T configuredInstance = null;
+
+try {
+configuredInstance = getConfiguredInstance(key, t, 
Collections.emptyMap());
+} catch (Exception e) {
+maybeClose(configuredInstance, "AutoCloseable object constructed 
and configured during failed call to getConfiguredInstance");
+throw e;
+}
+
+return configuredInstance;
 }
 
 /**
  * Get a configured instance of the give class specified by the given 
configuration key. If the object implements
  * Configurable configure it using the configuration.
  *
- * @param key The configuration key for the class
- * @param t The interface the class should implement
+ * @param key The configuration key for the class
+ * @param t   The interface the class should implement
  * @param configOverrides override origin configs
  * @return A configured instance of the class
  */
 public  T getConfiguredInstance(String key, Class t, Map configOverrides) {
 Class c = getClass(key);
+T configuredInstance = null;
 
-return getConfiguredInstance(c, t, originals(configOverrides));
+try {
+configuredInstance = getConfiguredInstance(c, t, 
originals(configOverrides));
+} catch (Exception e) {

Review Comment:
   If we do the try/catch here, it doesn't work; `configuredInstance` is 
guaranteed to be null in the catch block.
   
   I was thinking of something like this:
   ```java
   if (o instanceof Configurable) {
   try {
   ((Configurable) o).configure(configPairs);
   } catch (Exception e) {
   maybeClose(o, "AutoCloseable object constructed and 
configured during failed call to getConfiguredInstance");
   throw e;
   }
   }
   ```
   
   being added to [these 
lines](https://github.com/apache/kafka/blob/7d61d4505a16f09b85f5eb37adeff9c3534ec02c/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L406-L407)
 in the other variant of `getConfiguredInstance`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante opened a new pull request, #13181: KAFKA-14610: Publish Mirror Maker 2 offset syncs in task commit() method

2023-01-31 Thread via GitHub


C0urante opened a new pull request, #13181:
URL: https://github.com/apache/kafka/pull/13181

   The [Jira](https://issues.apache.org/jira/browse/KAFKA-14610) contains a 
more detailed description of the motivation for this change. To summarize, if 
there are bursty topic partitions, offset sync starvation can occur since MM2 
limits the number of in-flight syncs at one time.
   
   The fix here is to enqueue the latest applicable offset for any to-be-synced 
topic partition in the `commitRecord` method, immediately fire off all syncs 
that can be published in that method, and then periodically re-check for new 
syncs in `commit` in case there are syncs that could not be published due to, 
e.g., a burst of throughput on a topic partition.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #13025: KAFKA-14299: Fix pause and resume with state updater

2023-01-31 Thread via GitHub


guozhangwang commented on code in PR #13025:
URL: https://github.com/apache/kafka/pull/13025#discussion_r1092598270


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java:
##
@@ -190,7 +190,7 @@ public void clearTaskTimeout() {
 
 @Override
 public boolean commitNeeded() {
-throw new UnsupportedOperationException("This task is read-only");
+return task.commitNeeded();

Review Comment:
   Why do we need to change these two functions?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##
@@ -258,21 +269,27 @@ private List getTasksAndActions() {
 }
 
 private void addTask(final Task task) {
+final TaskId taskId = task.id();
 if (isStateless(task)) {
 addToRestoredTasks((StreamTask) task);
-log.info("Stateless active task " + task.id() + " was added to 
the restored tasks of the state updater");
+log.info("Stateless active task " + taskId + " was added to 
the restored tasks of the state updater");
+} else if (topologyMetadata.isPaused(taskId.topologyName())) {
+pausedTasks.put(taskId, task);

Review Comment:
   I'm wondering if this complexity is necessary, since we do not make strict 
ordering guarantees for paused topologies -- i.e. it's okay to still processing 
those tasks for a while after the `pause()` call is triggered. Is it really a 
correctness or concurrency issue?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1534,7 +1540,13 @@ Set standbyTaskIds() {
 Map allTasks() {
 // not bothering with an unmodifiable map, since the tasks themselves 
are mutable, but
 // if any outside code modifies the map or the tasks, it would be a 
severe transgression.
-return tasks.allTasksPerId();
+if (stateUpdater != null) {
+final Map ret = 
stateUpdater.getTasks().stream().collect(Collectors.toMap(Task::id, x -> x));
+ret.putAll(tasks.allTasksPerId());

Review Comment:
   I've changed the func name slightly in another PR, so if that PR is merged 
we need to do a slight rebase/conflict resolution, just FYI.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14139) Replaced disk can lead to loss of committed data even with non-empty ISR

2023-01-31 Thread Calvin Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682775#comment-17682775
 ] 

Calvin Liu edited comment on KAFKA-14139 at 1/31/23 10:38 PM:
--

Hi [~adupriez] , Thanks for checking this issue. I have been working on a KIP 
for this one for a while. It will have protocol changes to include the broker 
epoch in the AlterPartition and Fetch requests. Will share more details when 
the KIP is published. 

Sorry I did not assign the ticket to me earlier. Can you assign the ticket to 
me?


was (Author: JIRAUSER298384):
Hi [~adupriez] , Thanks for checking this issue. I have been working on a KIP 
for this one for a while. It will have protocol changes to include the broker 
epoch in the AlterPartition and Fetch requests. Will share more details when 
the KIP is published. 

> Replaced disk can lead to loss of committed data even with non-empty ISR
> 
>
> Key: KAFKA-14139
> URL: https://issues.apache.org/jira/browse/KAFKA-14139
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Alexandre Dupriez
>Priority: Major
> Fix For: 3.5.0
>
>
> We have been thinking about disk failure cases recently. Suppose that a disk 
> has failed and the user needs to restart the disk from an empty state. The 
> concern is whether this can lead to the unnecessary loss of committed data.
> For normal topic partitions, removal from the ISR during controlled shutdown 
> buys us some protection. After the replica is restarted, it must prove its 
> state to the leader before it can be added back to the ISR. And it cannot 
> become a leader until it does so.
> An obvious exception to this is when the replica is the last member in the 
> ISR. In this case, the disk failure itself has compromised the committed 
> data, so some amount of loss must be expected.
> We have been considering other scenarios in which the loss of one disk can 
> lead to data loss even when there are replicas remaining which have all of 
> the committed entries. One such scenario is this:
> Suppose we have a partition with two replicas: A and B. Initially A is the 
> leader and it is the only member of the ISR.
>  # Broker B catches up to A, so A attempts to send an AlterPartition request 
> to the controller to add B into the ISR.
>  # Before the AlterPartition request is received, replica B has a hard 
> failure.
>  # The current controller successfully fences broker B. It takes no action on 
> this partition since B is already out of the ISR.
>  # Before the controller receives the AlterPartition request to add B, it 
> also fails.
>  # While the new controller is initializing, suppose that replica B finishes 
> startup, but the disk has been replaced (all of the previous state has been 
> lost).
>  # The new controller sees the registration from broker B first.
>  # Finally, the AlterPartition from A arrives which adds B back into the ISR 
> even though it has an empty log.
> (Credit for coming up with this scenario goes to [~junrao] .)
> I tested this in KRaft and confirmed that this sequence is possible (even if 
> perhaps unlikely). There are a few ways we could have potentially detected 
> the issue. First, perhaps the leader should have bumped the leader epoch on 
> all partitions when B was fenced. Then the inflight AlterPartition would be 
> doomed no matter when it arrived.
> Alternatively, we could have relied on the broker epoch to distinguish the 
> dead broker's state from that of the restarted broker. This could be done by 
> including the broker epoch in both the `Fetch` request and in 
> `AlterPartition`.
> Finally, perhaps even normal kafka replication should be using a unique 
> identifier for each disk so that we can reliably detect when it has changed. 
> For example, something like what was proposed for the metadata quorum here: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14139) Replaced disk can lead to loss of committed data even with non-empty ISR

2023-01-31 Thread Calvin Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682775#comment-17682775
 ] 

Calvin Liu edited comment on KAFKA-14139 at 1/31/23 10:13 PM:
--

Hi [~adupriez] , Thanks for checking this issue. I have been working on a KIP 
for this one for a while. It will have protocol changes to include the broker 
epoch in the AlterPartition and Fetch requests. Will share more details when 
the KIP is published. 


was (Author: JIRAUSER298384):
Hi [~adupriez] , Thanks for checking this issue. I have been working on a KIP 
for this one for a while. Including the broker epoch in the AlterPartition and 
Fetch request is preferable. Will share more details when the KIP is published. 

> Replaced disk can lead to loss of committed data even with non-empty ISR
> 
>
> Key: KAFKA-14139
> URL: https://issues.apache.org/jira/browse/KAFKA-14139
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Alexandre Dupriez
>Priority: Major
> Fix For: 3.5.0
>
>
> We have been thinking about disk failure cases recently. Suppose that a disk 
> has failed and the user needs to restart the disk from an empty state. The 
> concern is whether this can lead to the unnecessary loss of committed data.
> For normal topic partitions, removal from the ISR during controlled shutdown 
> buys us some protection. After the replica is restarted, it must prove its 
> state to the leader before it can be added back to the ISR. And it cannot 
> become a leader until it does so.
> An obvious exception to this is when the replica is the last member in the 
> ISR. In this case, the disk failure itself has compromised the committed 
> data, so some amount of loss must be expected.
> We have been considering other scenarios in which the loss of one disk can 
> lead to data loss even when there are replicas remaining which have all of 
> the committed entries. One such scenario is this:
> Suppose we have a partition with two replicas: A and B. Initially A is the 
> leader and it is the only member of the ISR.
>  # Broker B catches up to A, so A attempts to send an AlterPartition request 
> to the controller to add B into the ISR.
>  # Before the AlterPartition request is received, replica B has a hard 
> failure.
>  # The current controller successfully fences broker B. It takes no action on 
> this partition since B is already out of the ISR.
>  # Before the controller receives the AlterPartition request to add B, it 
> also fails.
>  # While the new controller is initializing, suppose that replica B finishes 
> startup, but the disk has been replaced (all of the previous state has been 
> lost).
>  # The new controller sees the registration from broker B first.
>  # Finally, the AlterPartition from A arrives which adds B back into the ISR 
> even though it has an empty log.
> (Credit for coming up with this scenario goes to [~junrao] .)
> I tested this in KRaft and confirmed that this sequence is possible (even if 
> perhaps unlikely). There are a few ways we could have potentially detected 
> the issue. First, perhaps the leader should have bumped the leader epoch on 
> all partitions when B was fenced. Then the inflight AlterPartition would be 
> doomed no matter when it arrived.
> Alternatively, we could have relied on the broker epoch to distinguish the 
> dead broker's state from that of the restarted broker. This could be done by 
> including the broker epoch in both the `Fetch` request and in 
> `AlterPartition`.
> Finally, perhaps even normal kafka replication should be using a unique 
> identifier for each disk so that we can reliably detect when it has changed. 
> For example, something like what was proposed for the metadata quorum here: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14139) Replaced disk can lead to loss of committed data even with non-empty ISR

2023-01-31 Thread Calvin Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682775#comment-17682775
 ] 

Calvin Liu edited comment on KAFKA-14139 at 1/31/23 10:01 PM:
--

Hi [~adupriez] , Thanks for checking this issue. I have been working on a KIP 
for this one for a while. Including the broker epoch in the AlterPartition and 
Fetch request is preferable. Will share more details when the KIP is published. 


was (Author: JIRAUSER298384):
Hi [~adupriez] , Thanks for checking this issue. I have been working on a KIP 
for this one for a while. It is almost done, will keep you posted. 

> Replaced disk can lead to loss of committed data even with non-empty ISR
> 
>
> Key: KAFKA-14139
> URL: https://issues.apache.org/jira/browse/KAFKA-14139
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Alexandre Dupriez
>Priority: Major
> Fix For: 3.5.0
>
>
> We have been thinking about disk failure cases recently. Suppose that a disk 
> has failed and the user needs to restart the disk from an empty state. The 
> concern is whether this can lead to the unnecessary loss of committed data.
> For normal topic partitions, removal from the ISR during controlled shutdown 
> buys us some protection. After the replica is restarted, it must prove its 
> state to the leader before it can be added back to the ISR. And it cannot 
> become a leader until it does so.
> An obvious exception to this is when the replica is the last member in the 
> ISR. In this case, the disk failure itself has compromised the committed 
> data, so some amount of loss must be expected.
> We have been considering other scenarios in which the loss of one disk can 
> lead to data loss even when there are replicas remaining which have all of 
> the committed entries. One such scenario is this:
> Suppose we have a partition with two replicas: A and B. Initially A is the 
> leader and it is the only member of the ISR.
>  # Broker B catches up to A, so A attempts to send an AlterPartition request 
> to the controller to add B into the ISR.
>  # Before the AlterPartition request is received, replica B has a hard 
> failure.
>  # The current controller successfully fences broker B. It takes no action on 
> this partition since B is already out of the ISR.
>  # Before the controller receives the AlterPartition request to add B, it 
> also fails.
>  # While the new controller is initializing, suppose that replica B finishes 
> startup, but the disk has been replaced (all of the previous state has been 
> lost).
>  # The new controller sees the registration from broker B first.
>  # Finally, the AlterPartition from A arrives which adds B back into the ISR 
> even though it has an empty log.
> (Credit for coming up with this scenario goes to [~junrao] .)
> I tested this in KRaft and confirmed that this sequence is possible (even if 
> perhaps unlikely). There are a few ways we could have potentially detected 
> the issue. First, perhaps the leader should have bumped the leader epoch on 
> all partitions when B was fenced. Then the inflight AlterPartition would be 
> doomed no matter when it arrived.
> Alternatively, we could have relied on the broker epoch to distinguish the 
> dead broker's state from that of the restarted broker. This could be done by 
> including the broker epoch in both the `Fetch` request and in 
> `AlterPartition`.
> Finally, perhaps even normal kafka replication should be using a unique 
> identifier for each disk so that we can reliably detect when it has changed. 
> For example, something like what was proposed for the metadata quorum here: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14139) Replaced disk can lead to loss of committed data even with non-empty ISR

2023-01-31 Thread Calvin Liu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682775#comment-17682775
 ] 

Calvin Liu commented on KAFKA-14139:


Hi [~adupriez] , Thanks for checking this issue. I have been working on a KIP 
for this one for a while. It is almost done, will keep you posted. 

> Replaced disk can lead to loss of committed data even with non-empty ISR
> 
>
> Key: KAFKA-14139
> URL: https://issues.apache.org/jira/browse/KAFKA-14139
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Alexandre Dupriez
>Priority: Major
> Fix For: 3.5.0
>
>
> We have been thinking about disk failure cases recently. Suppose that a disk 
> has failed and the user needs to restart the disk from an empty state. The 
> concern is whether this can lead to the unnecessary loss of committed data.
> For normal topic partitions, removal from the ISR during controlled shutdown 
> buys us some protection. After the replica is restarted, it must prove its 
> state to the leader before it can be added back to the ISR. And it cannot 
> become a leader until it does so.
> An obvious exception to this is when the replica is the last member in the 
> ISR. In this case, the disk failure itself has compromised the committed 
> data, so some amount of loss must be expected.
> We have been considering other scenarios in which the loss of one disk can 
> lead to data loss even when there are replicas remaining which have all of 
> the committed entries. One such scenario is this:
> Suppose we have a partition with two replicas: A and B. Initially A is the 
> leader and it is the only member of the ISR.
>  # Broker B catches up to A, so A attempts to send an AlterPartition request 
> to the controller to add B into the ISR.
>  # Before the AlterPartition request is received, replica B has a hard 
> failure.
>  # The current controller successfully fences broker B. It takes no action on 
> this partition since B is already out of the ISR.
>  # Before the controller receives the AlterPartition request to add B, it 
> also fails.
>  # While the new controller is initializing, suppose that replica B finishes 
> startup, but the disk has been replaced (all of the previous state has been 
> lost).
>  # The new controller sees the registration from broker B first.
>  # Finally, the AlterPartition from A arrives which adds B back into the ISR 
> even though it has an empty log.
> (Credit for coming up with this scenario goes to [~junrao] .)
> I tested this in KRaft and confirmed that this sequence is possible (even if 
> perhaps unlikely). There are a few ways we could have potentially detected 
> the issue. First, perhaps the leader should have bumped the leader epoch on 
> all partitions when B was fenced. Then the inflight AlterPartition would be 
> doomed no matter when it arrived.
> Alternatively, we could have relied on the broker epoch to distinguish the 
> dead broker's state from that of the restarted broker. This could be done by 
> including the broker epoch in both the `Fetch` request and in 
> `AlterPartition`.
> Finally, perhaps even normal kafka replication should be using a unique 
> identifier for each disk so that we can reliably detect when it has changed. 
> For example, something like what was proposed for the metadata quorum here: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-853%3A+KRaft+Voter+Changes].



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vvcephei merged pull request #12879: KAFKA-14409: Clean ProcessorParameters from casting

2023-01-31 Thread via GitHub


vvcephei merged PR #12879:
URL: https://github.com/apache/kafka/pull/12879


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #12879: KAFKA-14409: Clean ProcessorParameters from casting

2023-01-31 Thread via GitHub


vvcephei commented on PR #12879:
URL: https://github.com/apache/kafka/pull/12879#issuecomment-1411120341

   There was only one test failure, which was in an unrelated component: 
`[Build / JDK 8 and Scala 2.12 / 
kafka.server.DynamicBrokerReconfigurationTest.testTrustStoreAlter(String).quorum=kraft](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-12879/4/testReport/kafka.server/DynamicBrokerReconfigurationTest/Build___JDK_8_and_Scala_2_12___testTrustStoreAlter_String__quorum_kraft/)`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah opened a new pull request, #13180: Add a summarizer for the metadata migration

2023-01-31 Thread via GitHub


mumrah opened a new pull request, #13180:
URL: https://github.com/apache/kafka/pull/13180

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #13158: KAFKA-14647: Moving TopicFilter to server-common/utils

2023-01-31 Thread via GitHub


vamossagar12 commented on PR #13158:
URL: https://github.com/apache/kafka/pull/13158#issuecomment-146347

   Thanks @fvaleri , I made the changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

2023-01-31 Thread via GitHub


beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1092516181


##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -168,6 +169,7 @@ public class KafkaConsumerTest {
 private final int defaultApiTimeoutMs = 6;
 private final int requestTimeoutMs = defaultApiTimeoutMs / 2;
 private final int heartbeatIntervalMs = 1000;
+private final int targetInterceptor = 3;

Review Comment:
   Just an observation, I see the following variable is used in only one test 
case  `private final int throttleMs = 10;` as well.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

2023-01-31 Thread via GitHub


beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1092513735


##
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##
@@ -54,6 +58,12 @@ public void testConfiguredInstances() {
 
testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter");
 testInvalidInputs("test1,test2");
 
testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,");
+testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", "
++ 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", "
++ 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR,  
org.apache.kafka.test.MockConsumerInterceptor.class);
+testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", "
++ 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", "
++ 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR,  
org.apache.kafka.test.MockProducerInterceptor.class);

Review Comment:
   @C0urante  Done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14487) Move LogManager to storage module

2023-01-31 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682769#comment-17682769
 ] 

Sagar Rao edited comment on KAFKA-14487 at 1/31/23 9:22 PM:


I started looking at this. It depends on 
https://issues.apache.org/jira/browse/KAFKA-14603. There's also a Pool.scala 
class which needs to be migrated which seems to be used in multiple places? I 
am not aware if a java equivalent class of it exists. Should we look to migrate 
this class to Java? 


was (Author: sagarrao):
I started looking at this. It depends on 
https://issues.apache.org/jira/browse/KAFKA-14603. There's also a Pool.scala 
class which needs to be migrated which seems to be used in multiple places? I 
am not aware if a java equivalent class of it exists. I can start migrating 
that first if needed.

> Move LogManager to storage module
> -
>
> Key: KAFKA-14487
> URL: https://issues.apache.org/jira/browse/KAFKA-14487
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Sagar Rao
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14487) Move LogManager to storage module

2023-01-31 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682769#comment-17682769
 ] 

Sagar Rao commented on KAFKA-14487:
---

I started looking at this. It depends on 
https://issues.apache.org/jira/browse/KAFKA-14603. There's also a Pool.scala 
class which needs to be migrated which seems to be used in multiple places? I 
am not aware if a java equivalent class of it exists. I can start migrating 
that first if needed.

> Move LogManager to storage module
> -
>
> Key: KAFKA-14487
> URL: https://issues.apache.org/jira/browse/KAFKA-14487
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Sagar Rao
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] pprovenzano commented on pull request #13114: KAFKA-14084: SCRAM support in KRaft.

2023-01-31 Thread via GitHub


pprovenzano commented on PR #13114:
URL: https://github.com/apache/kafka/pull/13114#issuecomment-1411037660

   I just updated AlterUserScramCredentialsRequestTest to work in KRaft after 
finding it while looking into the tests you wanted. It covers a lot of what you 
want testing ScramControlManager and ControllerApis given that it is an end to 
end test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13169: KAFKA-14658: Do not open broker ports until we are ready to accept traffic

2023-01-31 Thread via GitHub


cmccabe commented on code in PR #13169:
URL: https://github.com/apache/kafka/pull/13169#discussion_r1092446989


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -573,7 +591,20 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
   private val listenBacklogSize = config.socketListenBacklogSize
 
   private val nioSelector = NSelector.open()
-  private[network] val serverChannel = openServerSocket(endPoint.host, 
endPoint.port, listenBacklogSize)
+
+  // If the port is configured as 0, we are using a random (ephemeral) port, 
so we need to open
+  // the socket before we can find out what port we have. If it is set to a 
nonzero value, defer
+  // opening the socket until we start the Acceptor. The reason for deferring 
the socket opening
+  // is so that systems which assume that the socket being open indicates 
readiness are not
+  // confused.
+  private[network] var serverChannel: ServerSocketChannel  = _
+  private[network] val localPort: Int  = if (endPoint.port != 0) {

Review Comment:
   good idea. let me wait for this test run to complete then I'll add a log 
message



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13169: KAFKA-14658: Do not open broker ports until we are ready to accept traffic

2023-01-31 Thread via GitHub


cmccabe commented on code in PR #13169:
URL: https://github.com/apache/kafka/pull/13169#discussion_r1092445406


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -585,23 +616,35 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
   private var currentProcessorIndex = 0
   private[network] val throttledSockets = new 
mutable.PriorityQueue[DelayedCloseSocket]()
   private var started = false
-  private[network] val startFuture = new CompletableFuture[Void]()
+  private[network] val startedFuture = new CompletableFuture[Void]()
 
   val thread = KafkaThread.nonDaemon(
 
s"${threadPrefix()}-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}",
 this)
 
-  startFuture.thenRun(() => synchronized {
-if (!shouldRun.get()) {
-  debug(s"Ignoring start future for ${endPoint.listenerName} since the 
acceptor has already been shut down.")
-} else {
+  def start(): Unit = synchronized {
+try {
+  if (!shouldRun.get()) {
+throw new ClosedChannelException()

Review Comment:
   yeah, that's right.
   
   For example, if there was some error during broker startup, we would call 
shutdown and potentially shut down the SocketServer before the Acceptors had 
been started.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #13169: KAFKA-14658: Do not open broker ports until we are ready to accept traffic

2023-01-31 Thread via GitHub


mumrah commented on code in PR #13169:
URL: https://github.com/apache/kafka/pull/13169#discussion_r1092435602


##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -585,23 +616,35 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
   private var currentProcessorIndex = 0
   private[network] val throttledSockets = new 
mutable.PriorityQueue[DelayedCloseSocket]()
   private var started = false
-  private[network] val startFuture = new CompletableFuture[Void]()
+  private[network] val startedFuture = new CompletableFuture[Void]()
 
   val thread = KafkaThread.nonDaemon(
 
s"${threadPrefix()}-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}",
 this)
 
-  startFuture.thenRun(() => synchronized {
-if (!shouldRun.get()) {
-  debug(s"Ignoring start future for ${endPoint.listenerName} since the 
acceptor has already been shut down.")
-} else {
+  def start(): Unit = synchronized {
+try {
+  if (!shouldRun.get()) {
+throw new ClosedChannelException()

Review Comment:
   How does this happen? If the kafka server is shutdown before the socket 
server is able to finish starting?



##
core/src/main/scala/kafka/network/SocketServer.scala:
##
@@ -573,7 +591,20 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
   private val listenBacklogSize = config.socketListenBacklogSize
 
   private val nioSelector = NSelector.open()
-  private[network] val serverChannel = openServerSocket(endPoint.host, 
endPoint.port, listenBacklogSize)
+
+  // If the port is configured as 0, we are using a random (ephemeral) port, 
so we need to open
+  // the socket before we can find out what port we have. If it is set to a 
nonzero value, defer
+  // opening the socket until we start the Acceptor. The reason for deferring 
the socket opening
+  // is so that systems which assume that the socket being open indicates 
readiness are not
+  // confused.
+  private[network] var serverChannel: ServerSocketChannel  = _
+  private[network] val localPort: Int  = if (endPoint.port != 0) {

Review Comment:
   It might be good to log when we are opening the socket server early.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #12654: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener

2023-01-31 Thread via GitHub


guozhangwang commented on PR #12654:
URL: https://github.com/apache/kafka/pull/12654#issuecomment-1410999341

   @mjsax tried to resolve the long commit history but it seems not possible, 
so I created a new PR (with the same branch name) here: 
https://github.com/apache/kafka/pull/13179 it's ready for another look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang opened a new pull request, #13179: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener

2023-01-31 Thread via GitHub


guozhangwang opened a new pull request, #13179:
URL: https://github.com/apache/kafka/pull/13179

   1. Add the new API (default impl is empty) to StateRestoreListener.
   2. Update related unit tests
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang closed pull request #12654: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener

2023-01-31 Thread via GitHub


guozhangwang closed pull request #12654: KAFKA-10575: Add onRestoreSuspsnded to 
StateRestoreListener
URL: https://github.com/apache/kafka/pull/12654


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2023-01-31 Thread via GitHub


C0urante commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1092418826


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -711,9 +752,32 @@ KafkaBasedLog 
setupAndCreateKafkaBasedLog(String topic, final Wo
 return createKafkaBasedLog(topic, producerProps, consumerProps, new 
ConsumeCallback(), topicDescription, adminSupplier);
 }
 
-private void sendPrivileged(String key, byte[] value) {
+/**
+ * Send a single record to the config topic synchronously. Note that 
{@link #claimWritePrivileges()} must be
+ * successfully invoked before calling this method if this store is 
configured to use a fencable writer.
+ * @param key the record key
+ * @param value the record value
+ */
+private void sendPrivileged(String key, byte[] value) throws 
ExecutionException, InterruptedException, TimeoutException {
+sendPrivileged(Collections.singletonList(new ProducerKeyValue(key, 
value)));
+}
+
+/**
+ * Send one or more records to the config topic synchronously. Note that 
{@link #claimWritePrivileges()} must be
+ * successfully invoked before calling this method if this store is 
configured to use a fencable writer.
+ * @param keyValues the list of producer record key/value pairs
+ */
+private void sendPrivileged(List keyValues) throws 
ExecutionException, InterruptedException, TimeoutException {
 if (!usesFencableWriter) {
-configLog.send(key, value);
+List> producerFutures = new ArrayList<>();
+keyValues.forEach(
+keyValue -> 
producerFutures.add(configLog.send(keyValue.key, keyValue.value))
+);
+
+for (Future future : producerFutures) {
+future.get(READ_WRITE_TIMEOUT_MS, TimeUnit.MILLISECONDS);

Review Comment:
   This gives us 30 seconds for each write instead of 30 seconds total, and it 
doesn't take into account reading to the end of the log after writes have 
finished. Considering this is all taking place on the herder's tick thread, we 
should probably care about the difference.
   
   We might be able to use the [Timer 
class](https://github.com/apache/kafka/blob/eb7f490159c924ca0f21394d58366c257998f52e/clients/src/main/java/org/apache/kafka/common/utils/Timer.java)
 to simplify some of this logic.



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -712,8 +733,16 @@ KafkaBasedLog 
setupAndCreateKafkaBasedLog(String topic, final Wo
 }
 
 private void sendPrivileged(String key, byte[] value) {
+sendPrivileged(key, value, null);
+}
+
+private void sendPrivileged(String key, byte[] value, Callback 
callback) {
 if (!usesFencableWriter) {
-configLog.send(key, value);

Review Comment:
   Oof, that's a lot of context 😄
   
   Thinking about it some more, I'm hesitant to make significant changes to the 
exception mapper without a KIP since it's a crucial part of our API and there 
may be automated tooling (like K8s operators) built around the current 
behavior, and adding full stack traces and/or caused-by chains could make 
things less readable and human-friendly, especially for new users.
   
   Asking people to check the worker logs isn't a terrible solution, though it 
might be a bit tricky to make sure that that message reaches users regardless 
of whether EOS source support is enabled or disabled.
   
   An alternative could be to handle this case specially by instantiating an 
exception whose message contains information on its cause. For example, the 
message could be `"Failed to write task configs to Kafka. Caused by 
org.apache.kafka.common.errors.AuthorizationException: Not authorized to access 
topics: connect-configs"`.
   
   Regardless, this can and probably should be left as a follow-up since it's 
its own can of worms and doesn't have to block this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14666) MM2 should translate consumer group offsets behind replication flow

2023-01-31 Thread Greg Harris (Jira)
Greg Harris created KAFKA-14666:
---

 Summary: MM2 should translate consumer group offsets behind 
replication flow
 Key: KAFKA-14666
 URL: https://issues.apache.org/jira/browse/KAFKA-14666
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Affects Versions: 3.5.0
Reporter: Greg Harris


MirrorMaker2 includes an offset translation feature which can translate the 
offsets for an upstream consumer group to a corresponding downstream consumer 
group. It does this by keeping a topic of offset-syncs to correlate upstream 
and downstream offsets, and translates any source offsets which are ahead of 
the replication flow.

However, if a replication flow is closer to the end of a topic than the 
consumer group, then the offset translation feature will refuse to translate 
the offset for correctness reasons. This is because the MirrorCheckpointTask 
only keeps the latest offset correlation between source and target, it does not 
have sufficient information to translate older offsets.

The workarounds for this issue are to:
1. Pause the replication flow occasionally to allow the source to get ahead of 
MM2
2. Increase the offset.lag.max to delay offset syncs, increasing the window for 
translation to happen. With the fix for KAFKA-12468, this will also increase 
the lag of applications that are ahead of the replication flow, so this is a 
tradeoff.

Instead, the MirrorCheckpointTask should provide correct and best-effort 
translation for consumer groups behind the replication flow by keeping 
additional state, or re-reading the offset-syncs topic. This should be a 
substantial improvement for use-cases where applications have a higher latency 
to commit than the replication flow, or where applications are reading from the 
earliest offset.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] guozhangwang commented on pull request #12654: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener

2023-01-31 Thread via GitHub


guozhangwang commented on PR #12654:
URL: https://github.com/apache/kafka/pull/12654#issuecomment-1410947553

   > Why has this PR 392 commits?
   
   I created this branch / PR around Oct. 2022, and then I rebased on top of 
trunk. My local branch is in right place but it seems github cannot recognize 
the rebasing. That should be okay since when merging it would squash into a 
single commit anyways.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12654: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener

2023-01-31 Thread via GitHub


guozhangwang commented on code in PR #12654:
URL: https://github.com/apache/kafka/pull/12654#discussion_r1092384524


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java:
##
@@ -197,6 +198,49 @@ public void shouldNotRegisterStoreWithoutMetadata() {
 () -> changelogReader.register(new 
TopicPartition("ChangelogWithoutStoreMetadata", 0), stateManager));
 }
 
+@Test
+public void shouldSupportUnregisterChangelogBeforeCompletion() {

Review Comment:
   I can add that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #12654: KAFKA-10575: Add onRestoreSuspsnded to StateRestoreListener

2023-01-31 Thread via GitHub


guozhangwang commented on code in PR #12654:
URL: https://github.com/apache/kafka/pull/12654#discussion_r1092383808


##
streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java:
##
@@ -37,6 +37,9 @@
  * These two interfaces serve different restoration purposes and users should 
not try to implement both of them in a single
  * class during state store registration.
  *
+ * Also note that standby tasks restoration process are not monitored via this 
interface, since a standby task keep

Review Comment:
   Ack.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##
@@ -988,6 +988,18 @@ public void unregister(final Collection 
revokedChangelogs) {
 if (changelogMetadata != null) {
 if 
(!changelogMetadata.state().equals(ChangelogState.REGISTERED)) {
 revokedInitializedChangelogs.add(partition);
+
+// if the changelog is still in REGISTERED, it means it 
has not initialized and started
+// restoring yet, and hence the corresponding 
onRestoreStart was not called; in this case
+// we should not call onRestorePaused either

Review Comment:
   Ouch, Ack!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-12468) Initial offsets are copied from source to target cluster

2023-01-31 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris reassigned KAFKA-12468:
---

Assignee: Greg Harris

> Initial offsets are copied from source to target cluster
> 
>
> Key: KAFKA-12468
> URL: https://issues.apache.org/jira/browse/KAFKA-12468
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Bart De Neuter
>Assignee: Greg Harris
>Priority: Major
>
> We have an active-passive setup where  the 3 connectors from mirror maker 2 
> (heartbeat, checkpoint and source) are running on a dedicated Kafka connect 
> cluster on the target cluster.
> Offset syncing is enabled as specified by KIP-545. But when activated, it 
> seems the offsets from the source cluster are initially copied to the target 
> cluster without translation. This causes a negative lag for all synced 
> consumer groups. Only when we reset the offsets for each topic/partition on 
> the target cluster and produce a record on the topic/partition in the source, 
> the sync starts working correctly. 
> I would expect that the consumer groups are synced but that the current 
> offsets of the source cluster are not copied to the target cluster.
> This is the configuration we are currently using:
> Heartbeat connector
>  
> {code:xml}
> {
>   "name": "mm2-mirror-heartbeat",
>   "config": {
> "name": "mm2-mirror-heartbeat",
> "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
> "source.cluster.alias": "eventador",
> "target.cluster.alias": "msk",
> "source.cluster.bootstrap.servers": "",
> "target.cluster.bootstrap.servers": "",
> "topics": ".*",
> "groups": ".*",
> "tasks.max": "1",
> "replication.policy.class": "CustomReplicationPolicy",
> "sync.group.offsets.enabled": "true",
> "sync.group.offsets.interval.seconds": "5",
> "emit.checkpoints.enabled": "true",
> "emit.checkpoints.interval.seconds": "30",
> "emit.heartbeats.interval.seconds": "30",
> "key.converter": " 
> org.apache.kafka.connect.converters.ByteArrayConverter",
> "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
> Checkpoint connector:
> {code:xml}
> {
>   "name": "mm2-mirror-checkpoint",
>   "config": {
> "name": "mm2-mirror-checkpoint",
> "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
> "source.cluster.alias": "eventador",
> "target.cluster.alias": "msk",
> "source.cluster.bootstrap.servers": "",
> "target.cluster.bootstrap.servers": "",
> "topics": ".*",
> "groups": ".*",
> "tasks.max": "40",
> "replication.policy.class": "CustomReplicationPolicy",
> "sync.group.offsets.enabled": "true",
> "sync.group.offsets.interval.seconds": "5",
> "emit.checkpoints.enabled": "true",
> "emit.checkpoints.interval.seconds": "30",
> "emit.heartbeats.interval.seconds": "30",
> "key.converter": " 
> org.apache.kafka.connect.converters.ByteArrayConverter",
> "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
>  Source connector:
> {code:xml}
> {
>   "name": "mm2-mirror-source",
>   "config": {
> "name": "mm2-mirror-source",
> "connector.class": 
> "org.apache.kafka.connect.mirror.MirrorSourceConnector",
> "source.cluster.alias": "eventador",
> "target.cluster.alias": "msk",
> "source.cluster.bootstrap.servers": "",
> "target.cluster.bootstrap.servers": "",
> "topics": ".*",
> "groups": ".*",
> "tasks.max": "40",
> "replication.policy.class": "CustomReplicationPolicy",
> "sync.group.offsets.enabled": "true",
> "sync.group.offsets.interval.seconds": "5",
> "emit.checkpoints.enabled": "true",
> "emit.checkpoints.interval.seconds": "30",
> "emit.heartbeats.interval.seconds": "30",
> "key.converter": " 
> org.apache.kafka.connect.converters.ByteArrayConverter",
> "value.converter": 
> "org.apache.kafka.connect.converters.ByteArrayConverter"
>   }
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-12566) Flaky Test MirrorConnectorsIntegrationSSLTest#testReplication

2023-01-31 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12566?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris reassigned KAFKA-12566:
---

Assignee: Greg Harris  (was: Luke Chen)

> Flaky Test MirrorConnectorsIntegrationSSLTest#testReplication
> -
>
> Key: KAFKA-12566
> URL: https://issues.apache.org/jira/browse/KAFKA-12566
> Project: Kafka
>  Issue Type: Test
>  Components: mirrormaker, unit tests
>Reporter: Matthias J. Sax
>Assignee: Greg Harris
>Priority: Critical
>  Labels: flaky-test
>
>  
> {code:java}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 2. 
> Offsets not translated downstream to primary cluster. ==> expected:  
> but was:  at 
> org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at 
> org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at 
> org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193) at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) 
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>  at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:289)
> {code}
> {{LOGs}}
> {quote}[2021-03-26 03:28:06,157] ERROR Could not check connector state info. 
> (org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions:420) 
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not 
> read connector state. Error response: \{"error_code":404,"message":"No status 
> found for connector MirrorSourceConnector"} at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.connectorStatus(EmbeddedConnectCluster.java:479)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.checkConnectorState(EmbeddedConnectClusterAssertions.java:413)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.lambda$assertConnectorAndAtLeastNumTasksAreRunning$16(EmbeddedConnectClusterAssertions.java:286)
>  at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) 
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>  at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.assertConnectorAndAtLeastNumTasksAreRunning(EmbeddedConnectClusterAssertions.java:285)
>  at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(MirrorConnectorsIntegrationBaseTest.java:470)
>  at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:227){quote}
> and
> {quote}[2021-03-26 03:30:41,524] ERROR [MirrorHeartbeatConnector|task-0] 
> Graceful stop of task MirrorHeartbeatConnector-0 failed. 
> (org.apache.kafka.connect.runtime.Worker:866) [2021-03-26 03:30:41,527] ERROR 
> [MirrorHeartbeatConnector|task-0] 
> WorkerSourceTask\{id=MirrorHeartbeatConnector-0} failed to send record to 
> heartbeats: (org.apache.kafka.connect.runtime.WorkerSourceTask:372) 
> org.apache.kafka.common.KafkaException: Producer is closed forcefully. at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:750)
>  at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:737)
>  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:282) 
> at java.lang.Thread.run(Thread.java:748) [2021-03-26 03:30:42,248] ERROR 
> [MirrorHeartbeatConnector|task-0] 
> WorkerSourceTask\{id=MirrorHeartbeatConnector-0} Failed to flush, timed out 
> while waiting for producer to flush outstanding 1 messages 
> (org.apache.kafka.connect.runtime.WorkerSourceTask:512){quote}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12566) Flaky Test MirrorConnectorsIntegrationSSLTest#testReplication

2023-01-31 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12566?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682707#comment-17682707
 ] 

Greg Harris commented on KAFKA-12566:
-

Hey [~showuon] I'm going to assign this issue to me, as I had to do some test 
stabilization in this area as part of 
[https://github.com/apache/kafka/pull/13178] .

> Flaky Test MirrorConnectorsIntegrationSSLTest#testReplication
> -
>
> Key: KAFKA-12566
> URL: https://issues.apache.org/jira/browse/KAFKA-12566
> Project: Kafka
>  Issue Type: Test
>  Components: mirrormaker, unit tests
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
>
>  
> {code:java}
> org.opentest4j.AssertionFailedError: Condition not met within timeout 2. 
> Offsets not translated downstream to primary cluster. ==> expected:  
> but was:  at 
> org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) at 
> org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) at 
> org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:193) at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) 
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>  at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:289)
> {code}
> {{LOGs}}
> {quote}[2021-03-26 03:28:06,157] ERROR Could not check connector state info. 
> (org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions:420) 
> org.apache.kafka.connect.runtime.rest.errors.ConnectRestException: Could not 
> read connector state. Error response: \{"error_code":404,"message":"No status 
> found for connector MirrorSourceConnector"} at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster.connectorStatus(EmbeddedConnectCluster.java:479)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.checkConnectorState(EmbeddedConnectClusterAssertions.java:413)
>  at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.lambda$assertConnectorAndAtLeastNumTasksAreRunning$16(EmbeddedConnectClusterAssertions.java:286)
>  at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) 
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>  at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) at 
> org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.assertConnectorAndAtLeastNumTasksAreRunning(EmbeddedConnectClusterAssertions.java:285)
>  at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.waitUntilMirrorMakerIsRunning(MirrorConnectorsIntegrationBaseTest.java:470)
>  at 
> org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testReplication(MirrorConnectorsIntegrationBaseTest.java:227){quote}
> and
> {quote}[2021-03-26 03:30:41,524] ERROR [MirrorHeartbeatConnector|task-0] 
> Graceful stop of task MirrorHeartbeatConnector-0 failed. 
> (org.apache.kafka.connect.runtime.Worker:866) [2021-03-26 03:30:41,527] ERROR 
> [MirrorHeartbeatConnector|task-0] 
> WorkerSourceTask\{id=MirrorHeartbeatConnector-0} failed to send record to 
> heartbeats: (org.apache.kafka.connect.runtime.WorkerSourceTask:372) 
> org.apache.kafka.common.KafkaException: Producer is closed forcefully. at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortBatches(RecordAccumulator.java:750)
>  at 
> org.apache.kafka.clients.producer.internals.RecordAccumulator.abortIncompleteBatches(RecordAccumulator.java:737)
>  at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:282) 
> at java.lang.Thread.run(Thread.java:748) [2021-03-26 03:30:42,248] ERROR 
> [MirrorHeartbeatConnector|task-0] 
> WorkerSourceTask\{id=MirrorHeartbeatConnector-0} Failed to flush, timed out 
> while waiting for producer to flush outstanding 1 messages 
> (org.apache.kafka.connect.runtime.WorkerSourceTask:512){quote}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] guozhangwang commented on pull request #13167: KAFKA-14650: Synchronize access to tasks inside task manager

2023-01-31 Thread via GitHub


guozhangwang commented on PR #13167:
URL: https://github.com/apache/kafka/pull/13167#issuecomment-1410914374

   @lucasbru You're right! I think this issue exists even before #12397.
   
   After thinking that a bit, along with forward looking that IQ would need to 
access the Tasks plus the StateUpdater (which is already synchronized) I'll 
just synchronize on the access of the two maps instead.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 opened a new pull request, #13178: KAFKA-12468, KAFKA-14663, KAFKA-12566: Fix MM2 causing negative downstream lag

2023-01-31 Thread via GitHub


gharris1727 opened a new pull request, #13178:
URL: https://github.com/apache/kafka/pull/13178

   This PR addresses three distinct but closely related issues:
   
   1. [KAFKA-12468](https://issues.apache.org/jira/browse/KAFKA-12468) "Initial 
offsets are copied from source to target cluster" "Mirror Maker 2 Negative 
Offsets"
   2. [KAFKA-14663](https://issues.apache.org/jira/browse/KAFKA-14663) "High 
throughput topics can starve low-throughput MM2 offset syncs"
   3. [KAFKA-12566](https://issues.apache.org/jira/browse/KAFKA-12566) "Flaky 
Test MirrorConnectorsIntegrationSSLTest#testReplication"
   
   The primary issue being addressed here is the incorrect translation of 
offsets, the title issue.
   
   The [MM2 
KIP](https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0)
 does not discuss the offset translation mechanism in detail, so I'll summarize 
the mechanism as it currently exists on trunk:
   1. Records are mirrored from source topic-partition to target 
topic-partition by the MirrorSourceTask
   2. MirrorSourceTask will (occasionally) emit OffsetSync messages to an 
Offset Syncs topic. Offset syncs contain the upstream and downstream offset of 
an emitted data record.
   3. The MirrorCheckpointTask will consume from the offset syncs topic, and 
maintain an in-memory copy of the latest offset sync seen for each 
topic-partition (in OffsetSyncStore)
   4. Periodically the MirrorCheckpointTask will poll consumer group offsets 
for the source topic, and use it's in-memory copy of the latest offset sync to 
translate upstream offsets to downstream offsets.
   5. This is done by measuring the 'distance' between the MM2 offset sync and 
the upstream consumer group, and then assuming that the same distance applies 
in the downstream topic.
   
   Step (5) is correct when assuming that every *offset* from the source topic 
has already been reproduced in the downstream topic. However, this assumption 
is violated when offsets are not present, which can happen for a variety of 
reasons, including:
   1. Transaction markers take an offset but will never be emitted as records 
from the consumer
   2. Records are dropped by SMTs and will never be emitted to the target topic
   3. The source topic has been compacted and some offsets will never be 
emitted by the consumer
   4. MM2 replication is lagging behind an upstream consumer group and some 
records have not been replicated yet
   
   In any of these conditions, an upstream offset may be translated to a 
downstream offset which is beyond the corresponding record in the downstream 
topic. Consider the following concrete example of situation (4) **resulting in 
negative lag**:
   1. Source topic `A` has 1000 records, all with contiguous offsets
   2. An upstream consumer group `cg` is at the end of the log, offset 1000.
   3. MM2 begins replicating the topic, and writes 500 upstream records to the 
target topic `target.A`, and writes offset-syncs correlating (`A`, 500) with 
(`target.A`, 500).
   4. MM2 checkpoint reads `cg` offset 1000, translates the offset to 500 + 
(1000-500) = 1000, and writes to `target.cg`
   5. Someone checks the `target.cg` offset for `target.A` and observes that 
the group offset is 1000, the topic end offset is 500, and the lag is -500.
   
   And the following concrete example of situation (1) **resulting in 
undelivered data**.
   1. Source topic `A` has 1000 records, all emitted with a transactional 
producer.
   2. The 1000 records are interleaved with 1000 commit markers at every other 
offset.
   3. An upstream consumer group `cg` is in the middle of the topic, at offset 
1000.
   4. MM2 begins replicating the topic, and writes 1000 records to the target 
topic `target.A`, and writes offset-syncs correlating (`A`, 500) with 
(`target.A`, 250), in addition to other offset-syncs.
   5. MM2 checkpoint reads the `cg` offset 1000, translates the offset to 250 + 
(1000 - 500) = 750, and writes to `target.cg`
   6. A system fails-over from `cg` to `target.cg` and someone notices that the 
`cg` application read records 0-500, `target.cg` application read 750-1000, but 
no consumer ever received offsets 500-750.
   
   This PR adds a test that replicates transactional data, as in situation (1). 
It asserts that whenever an offset is translated, it does not pass the end of 
the downstream topic, and cannot cause negative lag. In addition the tests are 
strengthened to require the offset syncs to be emitted up to the end of the 
topic, requiring a fix for the offset-syncs topic starvation issue. This also 
exposed a number of mistakes and flakiness in the existing tests, so this PR 
also stabilizes the tests to make them useful for validating the negative 
offsets fix.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated 

[GitHub] [kafka] dajac commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API

2023-01-31 Thread via GitHub


dajac commented on code in PR #12972:
URL: https://github.com/apache/kafka/pull/12972#discussion_r1092320492


##
core/src/main/scala/kafka/server/ApiVersionManager.scala:
##
@@ -86,14 +100,7 @@ class DefaultApiVersionManager(
 finalizedFeatures.features.map(kv => (kv._1, 
kv._2.asInstanceOf[java.lang.Short])).asJava,
 finalizedFeatures.epoch,
 controllerApiVersions.orNull,
-listenerType)
-  }
-
-  override def enabledApis: collection.Set[ApiKeys] = {

Review Comment:
   Replaced the method by a val.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on a diff in pull request #12972: KAFKA-14391; Add ConsumerGroupHeartbeat API

2023-01-31 Thread via GitHub


dajac commented on code in PR #12972:
URL: https://github.com/apache/kafka/pull/12972#discussion_r1092319133


##
clients/src/main/resources/common/message/ConsumerGroupHeartbeatResponse.json:
##
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 68,
+  "type": "response",
+  "name": "ConsumerGroupHeartbeatResponse",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  // Supported errors:
+  // - GROUP_AUTHORIZATION_FAILED
+  // - NOT_COORDINATOR
+  // - COORDINATOR_NOT_AVAILABLE
+  // - COORDINATOR_LOAD_IN_PROGRESS
+  // - INVALID_REQUEST
+  // - UNKNOWN_MEMBER_ID
+  // - FENCED_MEMBER_EPOCH
+  // - UNSUPPORTED_ASSIGNOR
+  // - UNRELEASED_INSTANCE_ID
+  // - GROUP_MAX_SIZE_REACHED

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14661) Upgrade Zookeeper to 3.8.1

2023-01-31 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682678#comment-17682678
 ] 

Ismael Juma commented on KAFKA-14661:
-

3.6.x supported zk 3.4 clients, so this change has compatibility impact - i.e. 
we probably need a KIP. The second question is: what server versions are 
supported by 3.8.x clients?

> Upgrade Zookeeper to 3.8.1 
> ---
>
> Key: KAFKA-14661
> URL: https://issues.apache.org/jira/browse/KAFKA-14661
> Project: Kafka
>  Issue Type: Improvement
>  Components: packaging
>Reporter: Divij Vaidya
>Assignee: Christo Lolov
>Priority: Blocker
> Fix For: 3.5.0, 3.4.1, 3.3.3
>
>
> Current Zk version (3.6.x) supported by Apache Kafka has been EOL since 
> December 2022 [1]
> Users of Kafka are facing regulatory hurdles because of using a dependency 
> which is EOL, hence, I would suggest to upgrade this in all upcoming releases 
> (including patch releases of 3.3.x and 3.4.x versions).
> Some things to consider while upgrading (as pointed by [~ijuma] at [2]):
>  # If we upgrade the zk server to 3.8.1, what is the impact on the zk 
> clients. That is, what's the earliest zk client version that is supported by 
> the 3.8.x server?
>  # We need to ensure there are no regressions (particularly on the stability 
> front) when it comes to this upgrade. It would be good for someone to stress 
> test the system a bit with the new version and check if all works well.
> [1] [https://zookeeper.apache.org/releases.html] 
>  [2][https://github.com/apache/kafka/pull/12620#issuecomment-1409028650] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] guozhangwang commented on pull request #13025: KAFKA-14299: Fix pause and resume with state updater

2023-01-31 Thread via GitHub


guozhangwang commented on PR #13025:
URL: https://github.com/apache/kafka/pull/13025#issuecomment-1410815034

   ack, will take a look asap.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

2023-01-31 Thread via GitHub


C0urante commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1092133488


##
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##
@@ -476,14 +479,22 @@ public  List getConfiguredInstances(List 
classNames, Class t, M
 return objects;
 Map configPairs = originals();
 configPairs.putAll(configOverrides);
-for (Object klass : classNames) {
-Object o = getConfiguredInstance(klass, t, configPairs);
-objects.add(t.cast(o));
+
+try {
+for (Object klass : classNames) {
+Object o = getConfiguredInstance(klass, t, configPairs);
+objects.add(t.cast(o));
+}
+} catch (Exception e) {
+for (Object object : objects) {
+Utils.closeQuietly((AutoCloseable) object, "AutoCloseable 
object constructed and configured during failed call to 
getConfiguredInstances");

Review Comment:
   We still need an `instanceof` check here:
   ```suggestion
   if (object instanceof AutoCloseable) {
   Utils.closeQuietly((AutoCloseable) object, 
"AutoCloseable object constructed and configured during failed call to 
getConfiguredInstances");
   }
   ```



##
clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java:
##
@@ -137,6 +137,7 @@
 import static org.mockito.Mockito.when;
 
 public class KafkaProducerTest {
+private final int targetInterceptor = 3;

Review Comment:
   Nit: If we're not using this field in any other test, probably makes more 
sense to inline directly into the test case it's used in.



##
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##
@@ -168,6 +169,7 @@ public class KafkaConsumerTest {
 private final int defaultApiTimeoutMs = 6;
 private final int requestTimeoutMs = defaultApiTimeoutMs / 2;
 private final int heartbeatIntervalMs = 1000;
+private final int targetInterceptor = 3;

Review Comment:
   Nit: If we're not using this field in any other test, probably makes more 
sense to inline directly into the test case it's used in.



##
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##
@@ -476,14 +479,22 @@ public  List getConfiguredInstances(List 
classNames, Class t, M
 return objects;
 Map configPairs = originals();
 configPairs.putAll(configOverrides);
-for (Object klass : classNames) {
-Object o = getConfiguredInstance(klass, t, configPairs);
-objects.add(t.cast(o));
+
+try {
+for (Object klass : classNames) {
+Object o = getConfiguredInstance(klass, t, configPairs);
+objects.add(t.cast(o));
+}
+} catch (Exception e) {
+for (Object object : objects) {
+Utils.closeQuietly((AutoCloseable) object, "AutoCloseable 
object constructed and configured during failed call to 
getConfiguredInstances");

Review Comment:
   Actually, come to think of it, we might also want to invoke `close` on 
objects created in `getConfiguredInstance` if they throw an exception from 
`configure` 
[here](https://github.com/apache/kafka/blob/6c98544a964b40ede6bbe1b3440f8e5db96a4ad6/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L407).
   
   Perhaps we could pull this out into a reusable method and use it both here 
and there? Thinking something like:
   
   ```java
   private static void maybeClose(Object object, String name) {
   if (object instanceof AutoCloseable)
   Utils.closeQuietly(object, name);
   }
   ```



##
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##
@@ -45,6 +47,8 @@
 
 public class AbstractConfigTest {
 
+
+

Review Comment:
   We don't need this change; it's fine as-is.



##
clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java:
##
@@ -54,6 +58,12 @@ public void testConfiguredInstances() {
 
testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter");
 testInvalidInputs("test1,test2");
 
testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,");
+testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", "
++ 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR + ", "
++ 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_CONSUMER_INTERCEPTOR,  
org.apache.kafka.test.MockConsumerInterceptor.class);
+testInvalidInputs(TestInterceptorConfig.INTERCEPTOR_CLASSES_CONFIG, 
TestInterceptorConfig.ORG_APACHE_KAFKA_TEST_MOCK_PRODUCER_INTERCEPTOR + ", "
++ 
TestInterceptorConfig.ORG_APACHE_KAF

[GitHub] [kafka] lucasbru commented on pull request #13025: KAFKA-14299: Fix pause and resume with state updater

2023-01-31 Thread via GitHub


lucasbru commented on PR #13025:
URL: https://github.com/apache/kafka/pull/13025#issuecomment-1410798829

   @guozhangwang also wanted to have a look (note that I didn't address Brunos 
comments yet)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #13136: KAFKA-14582: Move JmxTool to tools

2023-01-31 Thread via GitHub


mimaison commented on code in PR #13136:
URL: https://github.com/apache/kafka/pull/13136#discussion_r1088957590


##
checkstyle/import-control.xml:
##
@@ -407,7 +407,8 @@
 
 
 
-
+
+

Review Comment:
   Can we keep the trailing spaces like all other entries?



##
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##
@@ -1033,7 +1033,7 @@ object ConsumerGroupCommand extends Logging {
 val describeOpt = parser.accepts("describe", DescribeDoc)
 val allGroupsOpt = parser.accepts("all-groups", AllGroupsDoc)
 val deleteOpt = parser.accepts("delete", DeleteDoc)
-val timeoutMsOpt = parser.accepts("timeout", TimeoutMsDoc)
+val timeoutMsOpt: OptionSpec[Long] = parser.accepts("timeout", 
TimeoutMsDoc)

Review Comment:
   Why is this PR touching this file?



##
tools/src/test/java/org/apache/kafka/tools/JmxCommandTest.java:
##
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.Exit;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXConnectorServerFactory;
+import javax.management.remote.JMXServiceURL;
+
+import java.lang.management.ManagementFactory;
+import java.net.ServerSocket;
+import java.rmi.registry.LocateRegistry;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Tag("integration")
+public class JmxCommandTest {
+private final ToolsTestUtils.MockExitProcedure exitProcedure = new 
ToolsTestUtils.MockExitProcedure();
+
+private static JMXConnectorServer jmxAgent;
+private static String jmxUrl;
+
+@BeforeAll
+public static void beforeAll() throws Exception {
+int port = findRandomOpenPortOnAllLocalInterfaces();
+jmxAgent = startJmxAgent(port);
+jmxUrl = String.format("service:jmx:rmi:///jndi/rmi://:%d/jmxrmi", 
port);
+}
+
+@AfterAll
+public static void afterAll() throws Exception {
+jmxAgent.stop();
+}
+
+@BeforeEach
+public void beforeEach() {
+Exit.setExitProcedure(exitProcedure);
+}
+
+@AfterEach
+public void afterEach() {
+Exit.resetExitProcedure();
+}
+
+@Test
+public void kafkaVersion() {
+String out = executeAndGetOut("--version");
+assertNormalExit();
+assertEquals(AppInfoParser.getVersion(), out);
+}
+
+@Test
+public void unrecognizedOption() {
+String err = executeAndGetErr("--foo");
+assertCommandFailure();
+assertTrue(err.contains("UnrecognizedOptionException"));
+assertTrue(err.contains("foo"));
+}
+
+@Test
+public void missingRequired() {
+String err = executeAndGetErr("--reporting-interval");
+assertCommandFailure();
+assertTrue(err.contains("OptionMissingRequiredArgumentException"));
+assertTrue(err.contains("reporting-interval"));
+}
+
+@Test
+public void invalidJmxUrl() {
+String err = executeAndGetErr("--jmx-url", 
String.format("localhost:"));

Review Comment:
   We don't need `String.format()` here



##
tools/src/main/java/org/apache/kafka/tools/JmxTool.java:
##
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+

[jira] [Updated] (KAFKA-14664) Raft idle ratio is inaccurate

2023-01-31 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-14664:

Description: 
The `poll-idle-ratio-avg` metric is intended to track how idle the raft IO 
thread is. When completely idle, it should measure 1. When saturated, it should 
measure 0. The problem with the current measurements is that they are treated 
equally with respect to time. For example, say we poll twice with the following 
durations:

Poll 1: 2s

Poll 2: 0s

Assume that the busy time is negligible, so 2s passes overall.

In the first measurement, 2s is spent waiting, so we compute and record a ratio 
of 1.0. In the second measurement, no time passes, and we record 0.0. The idle 
ratio is then computed as the average of these two values (1.0 + 0.0 / 2 = 
0.5), which suggests that the process was busy for 1s, which overestimates the 
true busy time.

Instead, we should sum up the time waiting over the full interval. 2s passes 
total here and 2s is idle, so we should compute 1.0.

  was:
The `poll-idle-ratio-avg` metric is intended to track how idle the raft IO 
thread is. When completely idle, it should measure 1. When saturated, it should 
measure 0. The problem with the current measurements is that they are treated 
equally with respect to time. For example, say we poll twice with the following 
durations:

Poll 1: 2s

Poll 2: 0s

Assume that the busy time is negligible, so 2s passes overall.

In the first measurement, 2s is spent waiting, so we compute and record a ratio 
of 1.0. In the second measurement, no time passes, and we record 0.0. The idle 
ratio is then computed as the average of these two values (1.0 + 0.0 / 2 = 
0.5), which suggests that the process was busy for 1s. 

Instead, we should sum up the time waiting over the full interval. 2s passes 
total here and 2s is idle, so we should compute 1.0.


> Raft idle ratio is inaccurate
> -
>
> Key: KAFKA-14664
> URL: https://issues.apache.org/jira/browse/KAFKA-14664
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> The `poll-idle-ratio-avg` metric is intended to track how idle the raft IO 
> thread is. When completely idle, it should measure 1. When saturated, it 
> should measure 0. The problem with the current measurements is that they are 
> treated equally with respect to time. For example, say we poll twice with the 
> following durations:
> Poll 1: 2s
> Poll 2: 0s
> Assume that the busy time is negligible, so 2s passes overall.
> In the first measurement, 2s is spent waiting, so we compute and record a 
> ratio of 1.0. In the second measurement, no time passes, and we record 0.0. 
> The idle ratio is then computed as the average of these two values (1.0 + 0.0 
> / 2 = 0.5), which suggests that the process was busy for 1s, which 
> overestimates the true busy time.
> Instead, we should sum up the time waiting over the full interval. 2s passes 
> total here and 2s is idle, so we should compute 1.0.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] guozhangwang commented on pull request #13164: MINOR: Fix scaladoc warnings

2023-01-31 Thread via GitHub


guozhangwang commented on PR #13164:
URL: https://github.com/apache/kafka/pull/13164#issuecomment-1410744946

   LGTM, Merged to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #13164: MINOR: Fix scaladoc warnings

2023-01-31 Thread via GitHub


guozhangwang merged PR #13164:
URL: https://github.com/apache/kafka/pull/13164


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #13120: MINOR: Connect Javadocs improvements

2023-01-31 Thread via GitHub


yashmayya commented on code in PR #13120:
URL: https://github.com/apache/kafka/pull/13120#discussion_r1092188566


##
connect/api/src/main/java/org/apache/kafka/connect/connector/policy/ConnectorClientConfigOverridePolicy.java:
##
@@ -23,25 +23,25 @@
 import java.util.List;
 
 /**
- * An interface for enforcing a policy on overriding of client configs via 
the connector configs.
- *
- * Common use cases are ability to provide principal per connector, 
sasl.jaas.config
+ * An interface for enforcing a policy on overriding of Kafka client configs 
via the connector configs.
+ * 
+ * Common use cases are ability to provide principal per connector, 
sasl.jaas.config
  * and/or enforcing that the producer/consumer configurations for 
optimizations are within acceptable ranges.
  */
 public interface ConnectorClientConfigOverridePolicy extends Configurable, 
AutoCloseable {
 
 
 /**
- * Worker will invoke this while constructing the producer for the 
SourceConnectors,  DLQ for SinkConnectors and the consumer for the
- * SinkConnectors to validate if all of the overridden client 
configurations are allowed per the
- * policy implementation. This would also be invoked during the validation 
of connector configs via the Rest API.
- *
+ * Workers will invoke this while constructing producer for 
SourceConnectors, DLQs for SinkConnectors and
+ * consumers for SinkConnectors to validate if all of the overridden 
client configurations are allowed per the

Review Comment:
   Makes sense



##
connect/api/src/main/java/org/apache/kafka/connect/connector/policy/ConnectorClientConfigRequest.java:
##
@@ -44,25 +44,25 @@ public ConnectorClientConfigRequest(
 }
 
 /**
- * Provides Config with prefix {@code producer.override.} for {@link 
ConnectorType#SOURCE}.
- * Provides Config with prefix {@code consumer.override.} for {@link 
ConnectorType#SINK}.
- * Provides Config with prefix {@code producer.override.} for {@link 
ConnectorType#SINK} for DLQ.
- * Provides Config with prefix {@code admin.override.} for {@link 
ConnectorType#SINK} for DLQ.
+ * Provides Config with prefix "{@code producer.override.}" for {@link 
ConnectorType#SOURCE}.
+ * Provides Config with prefix "{@code consumer.override.}" for {@link 
ConnectorType#SINK}.
+ * Provides Config with prefix "{@code producer.override.}" for {@link 
ConnectorType#SINK} for DLQ.
+ * Provides Config with prefix "{@code admin.override.}" for {@link 
ConnectorType#SINK} for DLQ.

Review Comment:
   Thanks, that's a great call out!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeqo opened a new pull request, #13177: [KAFKA-14441] Benchmark performance impact of metrics library

2023-01-31 Thread via GitHub


jeqo opened a new pull request, #13177:
URL: https://github.com/apache/kafka/pull/13177

   https://issues.apache.org/jira/browse/KAFKA-14441
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14660) Divide by zero security vulnerability (sonatype-2019-0422)

2023-01-31 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682648#comment-17682648
 ] 

Matthias J. Sax commented on KAFKA-14660:
-

The original PR did not make sense, as if totalCapacity would really be zero, 
there is a bug and just setting it to 1 does not sound right. I did already 
merge a new PR that just raises an exception for this case, and thus avoid 
divide-by-zero. This should resolve the issue.

> Divide by zero security vulnerability (sonatype-2019-0422)
> --
>
> Key: KAFKA-14660
> URL: https://issues.apache.org/jira/browse/KAFKA-14660
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
>Reporter: Andy Coates
>Assignee: Matthias J. Sax
>Priority: Minor
> Fix For: 3.5.0
>
>
> Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR 
> and, because the PR was never merged, is now reporting it as a security 
> vulnerability in the latest Kafka Streams library.
>  
> See:
>  * [Vulnerability: 
> sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven&component-name=org.apache.kafka%2Fkafka-streams&utm_source=ossindex-client&utm_medium=integration&utm_content=1.7.0)]
>  * [Original PR]([https://github.com/apache/kafka/pull/7414])
>  
> While it looks from the comments made by [~mjsax] and [~bbejeck] that the 
> divide-by-zero is not really an issue, the fact that its now being reported 
> as a vulnerability is, especially with regulators.
> PITA, but we should consider either getting this vulnerability removed 
> (Google wasn't very helpful in providing info on how to do this), or fixed 
> (Again, not sure how to tag the fix as fixing this issue).  One option may 
> just be to reopen the PR and merge (and then fix forward by switching it to 
> throw an exception).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14584) Move StateChangeLogMerger to tools

2023-01-31 Thread Federico Valeri (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14584?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682643#comment-17682643
 ] 

Federico Valeri commented on KAFKA-14584:
-

Users have to migrate from "kafka-run-class.sh 
kafka.tools.StateChangeLogMerger" to "kafka-run-class.sh 
org.apache.kafka.tools.StateChangeLogMerger".

> Move StateChangeLogMerger to tools
> --
>
> Key: KAFKA-14584
> URL: https://issues.apache.org/jira/browse/KAFKA-14584
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Federico Valeri
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14584) Move StateChangeLogMerger to tools

2023-01-31 Thread Federico Valeri (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14584?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Federico Valeri updated KAFKA-14584:

Fix Version/s: 3.5.0

> Move StateChangeLogMerger to tools
> --
>
> Key: KAFKA-14584
> URL: https://issues.apache.org/jira/browse/KAFKA-14584
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Federico Valeri
>Priority: Major
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14582) Move JmxTool to tools

2023-01-31 Thread Federico Valeri (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Federico Valeri updated KAFKA-14582:

Fix Version/s: 3.5.0

> Move JmxTool to tools
> -
>
> Key: KAFKA-14582
> URL: https://issues.apache.org/jira/browse/KAFKA-14582
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Federico Valeri
>Priority: Major
> Fix For: 3.5.0
>
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14582) Move JmxTool to tools

2023-01-31 Thread Federico Valeri (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682641#comment-17682641
 ] 

Federico Valeri commented on KAFKA-14582:
-

Users have to migrate from "kafka-run-class.sh kafka.tools.JmxTool" to 
"kafka-run-class.sh org.apache.kafka.tools.JmxTool".

> Move JmxTool to tools
> -
>
> Key: KAFKA-14582
> URL: https://issues.apache.org/jira/browse/KAFKA-14582
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Federico Valeri
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-14553) RecordAccumulator hangs in infinite NOP loop

2023-01-31 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur reopened KAFKA-14553:
--

> RecordAccumulator hangs in infinite NOP loop
> 
>
> Key: KAFKA-14553
> URL: https://issues.apache.org/jira/browse/KAFKA-14553
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.1
> Environment: - Spring Boot 3.0.1
> - Spring Cloud 2022.0.0
> Versions of dependencies are defined in boms of SB and SC:
> - micrometer-tracing-bridge-brave 1.0.0
> - zipkin-reporter-brave 2.16.3
> - zipkin-sender-kafka 2.16.3
>Reporter: Viczai Gábor
>Assignee: Luke Chen
>Priority: Minor
> Fix For: 3.4.0, 3.3.2
>
>
> *Summary:*
> There is an infinite loop in RecordAccumulator, if stickyBatchSize is 
> configured to be 0 in BuiltinPartitioner.
> (Which is the default case when using KafkaSender's default Builder.)
> *Details:*
> The infinite loop is caused by this while:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L293
> and this continue particularly:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L316
> because the partitionChanged() call in the condition always return true if 
> batchSize is 0.
> So program flow never reaches this point:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L318
> Thus no span data sent to Kafka ever.
> The problematic line in partitionChanged() is when it calls an update on the 
> BuiltInPartitioner:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L242
> which in fact always updates the partition because of this condition:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218
> therefore the next confdition in RecordAccumulator will evaluate to true also:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L243
> thus returning 'true' and forcing the 'continue' in the while(true) loop.
> *Suggested fix:*
> I think these conditions should be changed:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218
> The equal signs should be removed from the conditions:
> {code}if (producedBytes > stickyBatchSize && enableSwitch || producedBytes > 
> stickyBatchSize * 2) {{code}
> (Btw: line 213 also needs this modification.)
> *Note:*
> The problem arises because KafkaSender sets the batchSize to 0.
> https://github.com/openzipkin/zipkin-reporter-java/blob/2.16.3/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java#L88
> *Workaround:*
> Simply set the batch size greater than zero.
> {code:java}@Configuration
> public class SenderConfiguration {
>     @Bean
>     KafkaSender kafkaSender() {
>         Properties overrides = new Properties();
>         overrides.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);
>         return KafkaSender.newBuilder()
>             .bootstrapServers("localhost:9092")
>             .topic("zipkin")
>             .overrides(overrides)
>             .build();
>     }
> }{code}
> *Using:*
> - Spring Boot 3.0.1
> - Spring Cloud 2022.0.0
> pom.xml (fragment):
> {code:xml}        
>             org.springframework.boot
>             spring-boot-autoconfigure
>         
>         
>             org.springframework.boot
>             spring-boot-starter-actuator
>         
>         
>             io.micrometer
>             micrometer-registry-prometheus
>         
>         
>             io.micrometer
>             micrometer-tracing-bridge-brave
>         
>         
>             io.zipkin.reporter2
>             zipkin-reporter-brave
>         
>         
>             io.zipkin.reporter2
>             zipkin-sender-kafka
>         {code}
> Everything is on default settings, except a KafkaSender is explicitely 
> created as illustrated above. (No autoconfiguration available for Kafka 
> sender.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14553) RecordAccumulator hangs in infinite NOP loop

2023-01-31 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-14553:
-
Fix Version/s: (was: 3.4.0)

> RecordAccumulator hangs in infinite NOP loop
> 
>
> Key: KAFKA-14553
> URL: https://issues.apache.org/jira/browse/KAFKA-14553
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.1
> Environment: - Spring Boot 3.0.1
> - Spring Cloud 2022.0.0
> Versions of dependencies are defined in boms of SB and SC:
> - micrometer-tracing-bridge-brave 1.0.0
> - zipkin-reporter-brave 2.16.3
> - zipkin-sender-kafka 2.16.3
>Reporter: Viczai Gábor
>Assignee: Luke Chen
>Priority: Minor
> Fix For: 3.3.2
>
>
> *Summary:*
> There is an infinite loop in RecordAccumulator, if stickyBatchSize is 
> configured to be 0 in BuiltinPartitioner.
> (Which is the default case when using KafkaSender's default Builder.)
> *Details:*
> The infinite loop is caused by this while:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L293
> and this continue particularly:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L316
> because the partitionChanged() call in the condition always return true if 
> batchSize is 0.
> So program flow never reaches this point:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L318
> Thus no span data sent to Kafka ever.
> The problematic line in partitionChanged() is when it calls an update on the 
> BuiltInPartitioner:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L242
> which in fact always updates the partition because of this condition:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218
> therefore the next confdition in RecordAccumulator will evaluate to true also:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L243
> thus returning 'true' and forcing the 'continue' in the while(true) loop.
> *Suggested fix:*
> I think these conditions should be changed:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218
> The equal signs should be removed from the conditions:
> {code}if (producedBytes > stickyBatchSize && enableSwitch || producedBytes > 
> stickyBatchSize * 2) {{code}
> (Btw: line 213 also needs this modification.)
> *Note:*
> The problem arises because KafkaSender sets the batchSize to 0.
> https://github.com/openzipkin/zipkin-reporter-java/blob/2.16.3/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java#L88
> *Workaround:*
> Simply set the batch size greater than zero.
> {code:java}@Configuration
> public class SenderConfiguration {
>     @Bean
>     KafkaSender kafkaSender() {
>         Properties overrides = new Properties();
>         overrides.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);
>         return KafkaSender.newBuilder()
>             .bootstrapServers("localhost:9092")
>             .topic("zipkin")
>             .overrides(overrides)
>             .build();
>     }
> }{code}
> *Using:*
> - Spring Boot 3.0.1
> - Spring Cloud 2022.0.0
> pom.xml (fragment):
> {code:xml}        
>             org.springframework.boot
>             spring-boot-autoconfigure
>         
>         
>             org.springframework.boot
>             spring-boot-starter-actuator
>         
>         
>             io.micrometer
>             micrometer-registry-prometheus
>         
>         
>             io.micrometer
>             micrometer-tracing-bridge-brave
>         
>         
>             io.zipkin.reporter2
>             zipkin-reporter-brave
>         
>         
>             io.zipkin.reporter2
>             zipkin-sender-kafka
>         {code}
> Everything is on default settings, except a KafkaSender is explicitely 
> created as illustrated above. (No autoconfiguration available for Kafka 
> sender.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14553) RecordAccumulator hangs in infinite NOP loop

2023-01-31 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur resolved KAFKA-14553.
--
Resolution: Duplicate

> RecordAccumulator hangs in infinite NOP loop
> 
>
> Key: KAFKA-14553
> URL: https://issues.apache.org/jira/browse/KAFKA-14553
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.1
> Environment: - Spring Boot 3.0.1
> - Spring Cloud 2022.0.0
> Versions of dependencies are defined in boms of SB and SC:
> - micrometer-tracing-bridge-brave 1.0.0
> - zipkin-reporter-brave 2.16.3
> - zipkin-sender-kafka 2.16.3
>Reporter: Viczai Gábor
>Assignee: Luke Chen
>Priority: Minor
> Fix For: 3.4.0, 3.3.2
>
>
> *Summary:*
> There is an infinite loop in RecordAccumulator, if stickyBatchSize is 
> configured to be 0 in BuiltinPartitioner.
> (Which is the default case when using KafkaSender's default Builder.)
> *Details:*
> The infinite loop is caused by this while:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L293
> and this continue particularly:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L316
> because the partitionChanged() call in the condition always return true if 
> batchSize is 0.
> So program flow never reaches this point:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L318
> Thus no span data sent to Kafka ever.
> The problematic line in partitionChanged() is when it calls an update on the 
> BuiltInPartitioner:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L242
> which in fact always updates the partition because of this condition:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218
> therefore the next confdition in RecordAccumulator will evaluate to true also:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L243
> thus returning 'true' and forcing the 'continue' in the while(true) loop.
> *Suggested fix:*
> I think these conditions should be changed:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218
> The equal signs should be removed from the conditions:
> {code}if (producedBytes > stickyBatchSize && enableSwitch || producedBytes > 
> stickyBatchSize * 2) {{code}
> (Btw: line 213 also needs this modification.)
> *Note:*
> The problem arises because KafkaSender sets the batchSize to 0.
> https://github.com/openzipkin/zipkin-reporter-java/blob/2.16.3/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java#L88
> *Workaround:*
> Simply set the batch size greater than zero.
> {code:java}@Configuration
> public class SenderConfiguration {
>     @Bean
>     KafkaSender kafkaSender() {
>         Properties overrides = new Properties();
>         overrides.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);
>         return KafkaSender.newBuilder()
>             .bootstrapServers("localhost:9092")
>             .topic("zipkin")
>             .overrides(overrides)
>             .build();
>     }
> }{code}
> *Using:*
> - Spring Boot 3.0.1
> - Spring Cloud 2022.0.0
> pom.xml (fragment):
> {code:xml}        
>             org.springframework.boot
>             spring-boot-autoconfigure
>         
>         
>             org.springframework.boot
>             spring-boot-starter-actuator
>         
>         
>             io.micrometer
>             micrometer-registry-prometheus
>         
>         
>             io.micrometer
>             micrometer-tracing-bridge-brave
>         
>         
>             io.zipkin.reporter2
>             zipkin-reporter-brave
>         
>         
>             io.zipkin.reporter2
>             zipkin-sender-kafka
>         {code}
> Everything is on default settings, except a KafkaSender is explicitely 
> created as illustrated above. (No autoconfiguration available for Kafka 
> sender.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (KAFKA-14553) RecordAccumulator hangs in infinite NOP loop

2023-01-31 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14553?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur closed KAFKA-14553.

Assignee: Luke Chen

> RecordAccumulator hangs in infinite NOP loop
> 
>
> Key: KAFKA-14553
> URL: https://issues.apache.org/jira/browse/KAFKA-14553
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.1
> Environment: - Spring Boot 3.0.1
> - Spring Cloud 2022.0.0
> Versions of dependencies are defined in boms of SB and SC:
> - micrometer-tracing-bridge-brave 1.0.0
> - zipkin-reporter-brave 2.16.3
> - zipkin-sender-kafka 2.16.3
>Reporter: Viczai Gábor
>Assignee: Luke Chen
>Priority: Minor
> Fix For: 3.4.0, 3.3.2
>
>
> *Summary:*
> There is an infinite loop in RecordAccumulator, if stickyBatchSize is 
> configured to be 0 in BuiltinPartitioner.
> (Which is the default case when using KafkaSender's default Builder.)
> *Details:*
> The infinite loop is caused by this while:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L293
> and this continue particularly:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L316
> because the partitionChanged() call in the condition always return true if 
> batchSize is 0.
> So program flow never reaches this point:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L318
> Thus no span data sent to Kafka ever.
> The problematic line in partitionChanged() is when it calls an update on the 
> BuiltInPartitioner:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L242
> which in fact always updates the partition because of this condition:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218
> therefore the next confdition in RecordAccumulator will evaluate to true also:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L243
> thus returning 'true' and forcing the 'continue' in the while(true) loop.
> *Suggested fix:*
> I think these conditions should be changed:
> https://github.com/apache/kafka/blob/3.3.1/clients/src/main/java/org/apache/kafka/clients/producer/internals/BuiltInPartitioner.java#L218
> The equal signs should be removed from the conditions:
> {code}if (producedBytes > stickyBatchSize && enableSwitch || producedBytes > 
> stickyBatchSize * 2) {{code}
> (Btw: line 213 also needs this modification.)
> *Note:*
> The problem arises because KafkaSender sets the batchSize to 0.
> https://github.com/openzipkin/zipkin-reporter-java/blob/2.16.3/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java#L88
> *Workaround:*
> Simply set the batch size greater than zero.
> {code:java}@Configuration
> public class SenderConfiguration {
>     @Bean
>     KafkaSender kafkaSender() {
>         Properties overrides = new Properties();
>         overrides.put(ProducerConfig.BATCH_SIZE_CONFIG, 1);
>         return KafkaSender.newBuilder()
>             .bootstrapServers("localhost:9092")
>             .topic("zipkin")
>             .overrides(overrides)
>             .build();
>     }
> }{code}
> *Using:*
> - Spring Boot 3.0.1
> - Spring Cloud 2022.0.0
> pom.xml (fragment):
> {code:xml}        
>             org.springframework.boot
>             spring-boot-autoconfigure
>         
>         
>             org.springframework.boot
>             spring-boot-starter-actuator
>         
>         
>             io.micrometer
>             micrometer-registry-prometheus
>         
>         
>             io.micrometer
>             micrometer-tracing-bridge-brave
>         
>         
>             io.zipkin.reporter2
>             zipkin-reporter-brave
>         
>         
>             io.zipkin.reporter2
>             zipkin-sender-kafka
>         {code}
> Everything is on default settings, except a KafkaSender is explicitely 
> created as illustrated above. (No autoconfiguration available for Kafka 
> sender.)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14661) Upgrade Zookeeper to 3.8.1

2023-01-31 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682621#comment-17682621
 ] 

Divij Vaidya edited comment on KAFKA-14661 at 1/31/23 3:43 PM:
---

>From Zk 3.8.1 release notes [1]
 - ZooKeeper clients from 3.5.x onwards are fully compatible with 3.8.x 
servers. 

Apache Kafka (AK) upgraded to Zk v3.5.x starting AK v2.4.0 release [2] which 
means that clients starting Apache Kafka 2.4.0 will be able to communicate with 
servers running Zk 3.8.x.

[1] [https://zookeeper.apache.org/releases.html] 
[2] https://issues.apache.org/jira/browse/KAFKA-8634 


was (Author: divijvaidya):
>From Zk 3.8.1 release notes [1]
 - ZooKeeper clients from 3.5.x onwards are fully compatible with 3.8.x 
servers. 

Apache Kafka upgraded to Zk v3.5.x starting 2.4.0 release [2] which means that 
clients starting Apache Kafka 2.4.0 will be able to communicate with servers 
running 3.8.x Zk.

[1] [https://zookeeper.apache.org/releases.html] 
[2] https://issues.apache.org/jira/browse/KAFKA-8634 

> Upgrade Zookeeper to 3.8.1 
> ---
>
> Key: KAFKA-14661
> URL: https://issues.apache.org/jira/browse/KAFKA-14661
> Project: Kafka
>  Issue Type: Improvement
>  Components: packaging
>Reporter: Divij Vaidya
>Assignee: Christo Lolov
>Priority: Blocker
> Fix For: 3.5.0, 3.4.1, 3.3.3
>
>
> Current Zk version (3.6.x) supported by Apache Kafka has been EOL since 
> December 2022 [1]
> Users of Kafka are facing regulatory hurdles because of using a dependency 
> which is EOL, hence, I would suggest to upgrade this in all upcoming releases 
> (including patch releases of 3.3.x and 3.4.x versions).
> Some things to consider while upgrading (as pointed by [~ijuma] at [2]):
>  # If we upgrade the zk server to 3.8.1, what is the impact on the zk 
> clients. That is, what's the earliest zk client version that is supported by 
> the 3.8.x server?
>  # We need to ensure there are no regressions (particularly on the stability 
> front) when it comes to this upgrade. It would be good for someone to stress 
> test the system a bit with the new version and check if all works well.
> [1] [https://zookeeper.apache.org/releases.html] 
>  [2][https://github.com/apache/kafka/pull/12620#issuecomment-1409028650] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14661) Upgrade Zookeeper to 3.8.1

2023-01-31 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682621#comment-17682621
 ] 

Divij Vaidya edited comment on KAFKA-14661 at 1/31/23 3:43 PM:
---

>From Zk 3.8.1 release notes [1]
 - ZooKeeper clients from 3.5.x onwards are fully compatible with 3.8.x 
servers. 

Apache Kafka upgraded to Zk v3.5.x starting 2.4.0 release [2] which means that 
clients starting Apache Kafka 2.4.0 will be able to communicate with servers 
running 3.8.x Zk.

[1] [https://zookeeper.apache.org/releases.html] 
[2] https://issues.apache.org/jira/browse/KAFKA-8634 


was (Author: divijvaidya):
>From Zk 3.8.1 release notes [1]
- ZooKeeper clients from 3.5.x onwards are fully compatible with 3.8.x servers. 

Apache Kafka upgraded to 3.5.x starting 2.4.0 release [2] which means that 
clients starting Apache Kafka 2.4.0 will be able to communicate with servers 
running 3.8.x Zk.


[1] [https://zookeeper.apache.org/releases.html] 
[2] https://issues.apache.org/jira/browse/KAFKA-8634 

> Upgrade Zookeeper to 3.8.1 
> ---
>
> Key: KAFKA-14661
> URL: https://issues.apache.org/jira/browse/KAFKA-14661
> Project: Kafka
>  Issue Type: Improvement
>  Components: packaging
>Reporter: Divij Vaidya
>Assignee: Christo Lolov
>Priority: Blocker
> Fix For: 3.5.0, 3.4.1, 3.3.3
>
>
> Current Zk version (3.6.x) supported by Apache Kafka has been EOL since 
> December 2022 [1]
> Users of Kafka are facing regulatory hurdles because of using a dependency 
> which is EOL, hence, I would suggest to upgrade this in all upcoming releases 
> (including patch releases of 3.3.x and 3.4.x versions).
> Some things to consider while upgrading (as pointed by [~ijuma] at [2]):
>  # If we upgrade the zk server to 3.8.1, what is the impact on the zk 
> clients. That is, what's the earliest zk client version that is supported by 
> the 3.8.x server?
>  # We need to ensure there are no regressions (particularly on the stability 
> front) when it comes to this upgrade. It would be good for someone to stress 
> test the system a bit with the new version and check if all works well.
> [1] [https://zookeeper.apache.org/releases.html] 
>  [2][https://github.com/apache/kafka/pull/12620#issuecomment-1409028650] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14661) Upgrade Zookeeper to 3.8.1

2023-01-31 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682621#comment-17682621
 ] 

Divij Vaidya commented on KAFKA-14661:
--

>From Zk 3.8.1 release notes [1]
- ZooKeeper clients from 3.5.x onwards are fully compatible with 3.8.x servers. 

Apache Kafka upgraded to 3.5.x starting 2.4.0 release [2] which means that 
clients starting Apache Kafka 2.4.0 will be able to communicate with servers 
running 3.8.x Zk.


[1] [https://zookeeper.apache.org/releases.html] 
[2] https://issues.apache.org/jira/browse/KAFKA-8634 

> Upgrade Zookeeper to 3.8.1 
> ---
>
> Key: KAFKA-14661
> URL: https://issues.apache.org/jira/browse/KAFKA-14661
> Project: Kafka
>  Issue Type: Improvement
>  Components: packaging
>Reporter: Divij Vaidya
>Assignee: Christo Lolov
>Priority: Blocker
> Fix For: 3.5.0, 3.4.1, 3.3.3
>
>
> Current Zk version (3.6.x) supported by Apache Kafka has been EOL since 
> December 2022 [1]
> Users of Kafka are facing regulatory hurdles because of using a dependency 
> which is EOL, hence, I would suggest to upgrade this in all upcoming releases 
> (including patch releases of 3.3.x and 3.4.x versions).
> Some things to consider while upgrading (as pointed by [~ijuma] at [2]):
>  # If we upgrade the zk server to 3.8.1, what is the impact on the zk 
> clients. That is, what's the earliest zk client version that is supported by 
> the 3.8.x server?
>  # We need to ensure there are no regressions (particularly on the stability 
> front) when it comes to this upgrade. It would be good for someone to stress 
> test the system a bit with the new version and check if all works well.
> [1] [https://zookeeper.apache.org/releases.html] 
>  [2][https://github.com/apache/kafka/pull/12620#issuecomment-1409028650] 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14646) SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 3.3.2)

2023-01-31 Thread Jochen Schalanda (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17682618#comment-17682618
 ] 

Jochen Schalanda commented on KAFKA-14646:
--

[~mjsax] Unfortunately we already "solved" the issue by recreating the 
respective topologies with new internal topic names, so we cannot try out your 
suggestion.

> SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 
> 3.3.2)
> 
>
> Key: KAFKA-14646
> URL: https://issues.apache.org/jira/browse/KAFKA-14646
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.0, 3.3.1, 3.3.2
> Environment: Kafka Streams 3.2.3 (before update)
> Kafka Streams 3.3.2 (after update)
> Java 17 (Eclipse Temurin 17.0.5), Linux x86_64
>Reporter: Jochen Schalanda
>Priority: Major
> Fix For: 3.4.0, 3.3.3
>
>
> Hey folks,
>  
> we've just updated an application from *_Kafka Streams 3.2.3 to 3.3.2_* and 
> started getting the following exceptions:
> {code:java}
> org.apache.kafka.common.errors.UnsupportedVersionException: 
> SubscriptionWrapper is of an incompatible version. {code}
> After swiftly looking through the code, this exception is potentially thrown 
> in two places:
>  * 
> [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java#L73-L78]
>  ** Here the check was changed in Kafka 3.3.x: 
> [https://github.com/apache/kafka/commit/9dd25ecd9ce17e608c6aba98e0422b26ed133c12]
>  * 
> [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java#L94-L99]
>  ** Here the check wasn't changed.
>  
> Is it possible that the second check in 
> {{SubscriptionStoreReceiveProcessorSupplier}} was forgotten?
>  
> Any hints how to resolve this issue without a downgrade?
> Since this only affects 2 of 15 topologies in the application, I'm hesitant 
> to just downgrade to Kafka 3.2.3 again since the internal topics might 
> already have been updated to use the "new" version of 
> {{{}SubscriptionWrapper{}}}.
>  
> Related discussion in the Confluent Community Slack: 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1674497054507119]
> h2. Stack trace
> {code:java}
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_8, 
> processor=XXX-joined-changed-fk-subscription-registration-source, 
> topic=topic.rev7-XXX-joined-changed-fk-subscription-registration-topic, 
> partition=8, offset=12297976, 
> stacktrace=org.apache.kafka.common.errors.UnsupportedVersionException: 
> SubscriptionWrapper is of an incompatible version.
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:750)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:770)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] divijvaidya commented on pull request #13078: KAFKA-13999: Add ProducerCount metrics (KIP-847)

2023-01-31 Thread via GitHub


divijvaidya commented on PR #13078:
URL: https://github.com/apache/kafka/pull/13078#issuecomment-1410523645

   @anatasiavela the proposal (using concurrentHashMap for producers) impacts 
this change. Let me try to explain why.
   
   Currently you are using `producerIdCount` and updating it on every 
add/delete to the `producers` map. Alternative path was to simply call 
`producers.size()` whenever we want to compute the metric. But you didn't 
choose this path because `producers` is accessed by multiple threads at the 
same time and we usually guard access to it via a lock. Hence, if you wanted to 
call `producers.size()` you would have to acquire a lock and that is not 
optimal for a mere metric calculation. Hence, your approach made sense. BUT if 
you change the `producers` to a concurrentHashMap, you wouldn't have to acquire 
a lock to call `producers.size()` when computing for metrics. Without acquiring 
a lock, the risk is that another thread may be mutating the `producers` map at 
the same time. As per the discussion above in the PR, that is acceptable to us. 
Hence, you can simply change the `producers` to a concurrentHashMap and remove 
the code logic to update `producerIdCount` on every add/remove. This simplifi
 es the code greatly.
   
   Let me know if that makes sense.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14656) Brokers rejecting LISR during ZK migration

2023-01-31 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14656?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur resolved KAFKA-14656.
--
Resolution: Fixed

> Brokers rejecting LISR during ZK migration
> --
>
> Key: KAFKA-14656
> URL: https://issues.apache.org/jira/browse/KAFKA-14656
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.4.0
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Blocker
> Fix For: 3.4.0
>
>
> During the ZK migration, the KRaft controller sends controller RPCs to the ZK 
> brokers (LISR, UMR, etc). Since the migration can begin immediately after a 
> ZK broker starts up with migration enabled, it is possible that this broker 
> is not seen as alive by the rest of the brokers. This is due to the KRaft 
> controller taking over before the ZK controller can send out UMR with the 
> restarted broker.
>  
> The result is that the parts of the LISR sent by KRaft immediately after the 
> metadata migration is rejected by brokers due the leader being offline. 
>  
> The fix for this is to send an UMR to all brokers after the migration with 
> the set of alive brokers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] Cerchie commented on a diff in pull request #13161: Kafka 14128

2023-01-31 Thread via GitHub


Cerchie commented on code in PR #13161:
URL: https://github.com/apache/kafka/pull/13161#discussion_r1092035905


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##
@@ -538,6 +544,8 @@ protected Map getNumPartitions(final 
Set topics,
 tempUnknownTopics.add(topicName);
 log.debug("The leader of topic {} is not available.\n" +
 "Error message was: {}", topicName, cause.toString());
+} else if (cause instanceof TimeoutException) {
+throw new RuntimeException();

Review Comment:
   Thank you -- removed the exception and the existing block and replaced the 
exception with the correct behavior. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Cerchie commented on a diff in pull request #13161: Kafka 14128

2023-01-31 Thread via GitHub


Cerchie commented on code in PR #13161:
URL: https://github.com/apache/kafka/pull/13161#discussion_r1092034974


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##
@@ -466,7 +469,10 @@ public Set makeReady(final Map topics) {
 topicName)
 );
 }
-} else {
+} else if (cause instanceof  TimeoutException) 
{

Review Comment:
   ah -- that's right. done. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Cerchie commented on a diff in pull request #13161: Kafka 14128

2023-01-31 Thread via GitHub


Cerchie commented on code in PR #13161:
URL: https://github.com/apache/kafka/pull/13161#discussion_r1092027837


##
clients/src/main/java/org/apache/kafka/common/internals/KafkaFutureImpl.java:
##
@@ -160,7 +160,7 @@ private void maybeThrowCancellationException(Throwable 
cause) {
  * Waits if necessary for this future to complete, and then returns its 
result.
  */
 @Override
-public T get() throws InterruptedException, ExecutionException {
+public abstract T get(long timeout, TimeUnit unit) throws 
InterruptedException, ExecutionException, TimeoutException {

Review Comment:
   I'm not sure what you mean here -- for more on the problem, read 
https://issues.apache.org/jira/browse/KAFKA-14128 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Cerchie commented on a diff in pull request #13161: Kafka 14128

2023-01-31 Thread via GitHub


Cerchie commented on code in PR #13161:
URL: https://github.com/apache/kafka/pull/13161#discussion_r1092026440


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##
@@ -521,7 +524,7 @@ protected Map getNumPartitions(final 
Set topics,
 for (final Map.Entry> 
topicFuture : futures.entrySet()) {
 final String topicName = topicFuture.getKey();
 try {
-final TopicDescription topicDescription = 
topicFuture.getValue().get();
+final TopicDescription topicDescription = 
topicFuture.getValue().get(Long.parseLong(DEFAULT_API_TIMEOUT_MS_CONFIG), 
TimeUnit.MILLISECONDS);

Review Comment:
   removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] beardt commented on a diff in pull request #13168: Kafka 14565: Interceptor Resource Leak

2023-01-31 Thread via GitHub


beardt commented on code in PR #13168:
URL: https://github.com/apache/kafka/pull/13168#discussion_r1092017609


##
clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java:
##
@@ -55,6 +58,11 @@ public void configure(Map configs) {
 Object clientIdValue = configs.get(ConsumerConfig.CLIENT_ID_CONFIG);
 if (clientIdValue == null)
 throw new ConfigException("Mock consumer interceptor expects 
configuration " + ProducerConfig.CLIENT_ID_CONFIG);
+
+CONFIG_COUNT.incrementAndGet();
+if (CONFIG_COUNT.get() == THROW_CONFIG_EXCEPTION_THRESHOLD.get()) {
+throw new ConfigException("Kafka producer creation failed. Failure 
may not have cleaned up listener thread resource.");

Review Comment:
   Done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dpcollins-google commented on a diff in pull request #13162: fix: replace an inefficient loop in kafka internals

2023-01-31 Thread via GitHub


dpcollins-google commented on code in PR #13162:
URL: https://github.com/apache/kafka/pull/13162#discussion_r1091834852


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1225,13 +1226,11 @@ public static long tryWriteTo(TransferableChannel 
destChannel,
  * @param length The number of bytes to write
  * @throws IOException For any errors writing to the output
  */
-public static void writeTo(DataOutput out, ByteBuffer buffer, int length) 
throws IOException {
+public static void writeTo(DataOutputStream out, ByteBuffer buffer, int 
length) throws IOException {
 if (buffer.hasArray()) {
 out.write(buffer.array(), buffer.position() + 
buffer.arrayOffset(), length);
 } else {
-int pos = buffer.position();
-for (int i = pos; i < length + pos; i++)
-out.writeByte(buffer.get(i));
+Channels.newChannel(out).write(buffer);

Review Comment:
   Sure. In a particular workload, this code path was about 30% of CPU usage in 
flamegraphs. It is now 2-3% after a local patch.
   
   This hasn't been discussed in dev- its just an attempt to upstream a small 
performance improvement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru commented on pull request #13164: MINOR: Fix scaladoc warnings

2023-01-31 Thread via GitHub


lucasbru commented on PR #13164:
URL: https://github.com/apache/kafka/pull/13164#issuecomment-1410168606

   @mjsax can we merge this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

2023-01-31 Thread via GitHub


fvaleri commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-1410076642

   > @mimaison , I updated the system test to point to the new class. That one 
place seemed to be the only one relevant in this case.
   
   Do you have a test run output that shows it works and run time is similar? 
You can look at what I did for the JmxTool migration that is also used by STs. 
I would also suggest to discard the first run of such test, because the test 
framework needs to start a bunch of containers.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #13095: KAFKA-14580: Moving EndToEndLatency from core to tools module

2023-01-31 Thread via GitHub


vamossagar12 commented on PR #13095:
URL: https://github.com/apache/kafka/pull/13095#issuecomment-141003

   @mimaison , I updated the system test to point to the new class. That one 
place seemed to be the only one relevant in this case. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13176: MINOR: some ZK migration code cleanups.

2023-01-31 Thread via GitHub


Hangleton commented on code in PR #13176:
URL: https://github.com/apache/kafka/pull/13176#discussion_r1091621015


##
core/src/main/scala/kafka/migration/MigrationPropagator.scala:
##
@@ -79,6 +81,18 @@ class MigrationPropagator(
 _image = image
   }
 
+  /**
+   * A very expensive function that creates a map with an entry for every 
partition that exists, from
+   * (topic name, partition index) to partition registration.
+   */
+  def materializePartitions(topicsImage: TopicsImage): 
util.Map[TopicPartition, PartitionRegistration] = {

Review Comment:
   Curious - what is the reason for using `util.Map` instead of one of scala 
map DS?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] urbandan commented on a diff in pull request #13163: KAFKA-14653: MirrorMakerConfig using raw properties instead of post-r…

2023-01-31 Thread via GitHub


urbandan commented on code in PR #13163:
URL: https://github.com/apache/kafka/pull/13163#discussion_r1091581987


##
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##
@@ -246,13 +246,22 @@ public Map originals(Map 
configOverrides) {
  */
 public Map originalsStrings() {
 Map copy = new RecordingMap<>();
+copyAsStrings(originals, copy);
+return copy;
+}
+
+/**
+ * Ensures that all values of a map are strings, and copies them to 
another map.
+ * @param originals The map to validate.
+ * @param copy The target to copy to.
+ */
+protected static void copyAsStrings(Map originals, Map copy) {

Review Comment:
   Originally, I had this inlined in the MMConfig ctor, but didn't want to 
duplicate code - anyway, I'll inline it again, that's safer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] urbandan commented on a diff in pull request #13137: KAFKA-15086: Intra-cluster communication for Mirror Maker 2

2023-01-31 Thread via GitHub


urbandan commented on code in PR #13137:
URL: https://github.com/apache/kafka/pull/13137#discussion_r1091577828


##
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java:
##
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.mirror.MirrorMaker;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+
+@Tag("integration")
+public class DedicatedMirrorIntegrationTest {
+
+private static final Logger log = 
LoggerFactory.getLogger(DedicatedMirrorIntegrationTest.class);
+
+private static final int TOPIC_CREATION_TIMEOUT_MS = 30_000;
+private static final int TOPIC_REPLICATION_TIMEOUT_MS = 30_000;
+
+private Map kafkaClusters;
+private Map mirrorMakers;
+
+@BeforeEach
+public void setup() {
+kafkaClusters = new HashMap<>();
+mirrorMakers = new HashMap<>();
+}
+
+@AfterEach
+public void teardown() throws Throwable {
+AtomicReference shutdownFailure = new AtomicReference<>();
+mirrorMakers.forEach((name, mirrorMaker) ->
+Utils.closeQuietly(mirrorMaker::stop, "MirrorMaker worker '" + 
name + "'", shutdownFailure)
+);
+kafkaClusters.forEach((name, kafkaCluster) ->
+Utils.closeQuietly(kafkaCluster::stop, "Embedded Kafka cluster '" 
+ name + "'", shutdownFailure)
+);
+if (shutdownFailure.get() != null) {
+throw shutdownFailure.get();
+}
+}
+
+private EmbeddedKafkaCluster startKafkaCluster(String name, int 
numBrokers, Properties brokerProperties) {
+if (kafkaClusters.containsKey(name))
+throw new IllegalStateException("Cannot register multiple Kafka 
clusters with the same name");
+
+EmbeddedKafkaCluster result = new EmbeddedKafkaCluster(numBrokers, 
brokerProperties);
+kafkaClusters.put(name, result);
+
+result.start();
+
+return result;
+}
+
+private MirrorMaker startMirrorMaker(String name, Map 
mmProps) {
+if (mirrorMakers.containsKey(name))
+throw new IllegalStateException("Cannot register multiple 
MirrorMaker nodes with the same name");
+
+MirrorMaker result = new MirrorMaker(mmProps);
+mirrorMakers.put(name, result);
+
+result.start();
+
+return result;
+}
+
+/**
+ * Test that a multi-node dedicated cluster is able to dynamically detect 
new topics at runtime

Review Comment:
   Thanks for the clarification. I think using the EOS endpoints should be 
enough, as I don't really have any ideas on how to make sure that the task 
config update is triggered.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #13142: KAFKA-14491: [2/N] Refactor RocksDB store open iterator management

2023-01-31 Thread via GitHub


mjsax merged PR #13142:
URL: https://github.com/apache/kafka/pull/13142


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #13143: KAFKA-14491: [3/N] Add logical key value segments

2023-01-31 Thread via GitHub


mjsax commented on PR #13143:
URL: https://github.com/apache/kafka/pull/13143#issuecomment-1409929859

   Merged the other PR -- can you rebase this one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org