[jira] [Resolved] (KAFKA-14864) Memory leak in KStreamWindowAggregate with ON_WINDOW_CLOSE emit strategy

2023-04-03 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-14864.
-
Fix Version/s: 3.4.1
   3.3.3
   Resolution: Fixed

> Memory leak in KStreamWindowAggregate with ON_WINDOW_CLOSE emit strategy
> 
>
> Key: KAFKA-14864
> URL: https://issues.apache.org/jira/browse/KAFKA-14864
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0, 3.3.2
>Reporter: Victoria Xia
>Assignee: Victoria Xia
>Priority: Major
> Fix For: 3.5.0, 3.4.1, 3.3.3
>
>
> The Streams DSL processor implementation for the ON_WINDOW_CLOSE emit 
> strategy during KStream windowed aggregations opens a key-value iterator but 
> does not call `close()` on it 
> ([link|https://github.com/apache/kafka/blob/5afedd9ac37c4d740f47867cfd31eaed15dc542f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java#L203]),
>  despite the Javadocs for the iterator making clear that users must do so in 
> order to release resources 
> ([link|https://github.com/apache/kafka/blob/5afedd9ac37c4d740f47867cfd31eaed15dc542f/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java#L27]).
>   
> I discovered this bug while running load testing benchmarks and noticed that 
> some runs were sporadically hitting OOMs, so it is definitely possible to hit 
> this in practice.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mjsax commented on pull request #13470: KAFKA-14864: Close iterator in KStream windowed aggregation emit on window close

2023-04-03 Thread via GitHub


mjsax commented on PR #13470:
URL: https://github.com/apache/kafka/pull/13470#issuecomment-1495353048

   Thx. Merged to `trunk` and cherry-picked to `3.4` and `3.3` branches.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14722) Make BooleanSerde public

2023-04-03 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14722?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17708211#comment-17708211
 ] 

Matthias J. Sax commented on KAFKA-14722:
-

The docs PRs was not merged yet – thus the work is not yet completed. We keep 
Jiras open as reminders about this. Docs are as important as the feature itself.

> Make BooleanSerde public
> 
>
> Key: KAFKA-14722
> URL: https://issues.apache.org/jira/browse/KAFKA-14722
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Spacrocket
>Priority: Minor
>  Labels: beginner, kip, newbie
>
> KIP-907: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-907%3A+Add+Boolean+Serde+to+public+interface]
>  
> We introduce a "BooleanSerde" via 
> [https://github.com/apache/kafka/pull/13249] as internal class. We could make 
> it public.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14722) Make BooleanSerde public

2023-04-03 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14722:

Fix Version/s: 3.5.0

> Make BooleanSerde public
> 
>
> Key: KAFKA-14722
> URL: https://issues.apache.org/jira/browse/KAFKA-14722
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Spacrocket
>Priority: Minor
>  Labels: beginner, kip, newbie
> Fix For: 3.5.0
>
>
> KIP-907: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-907%3A+Add+Boolean+Serde+to+public+interface]
>  
> We introduce a "BooleanSerde" via 
> [https://github.com/apache/kafka/pull/13249] as internal class. We could make 
> it public.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14864) Memory leak in KStreamWindowAggregate with ON_WINDOW_CLOSE emit strategy

2023-04-03 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14864?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-14864:

Affects Version/s: 3.3.2
   3.4.0

> Memory leak in KStreamWindowAggregate with ON_WINDOW_CLOSE emit strategy
> 
>
> Key: KAFKA-14864
> URL: https://issues.apache.org/jira/browse/KAFKA-14864
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.4.0, 3.3.2
>Reporter: Victoria Xia
>Assignee: Victoria Xia
>Priority: Major
> Fix For: 3.5.0
>
>
> The Streams DSL processor implementation for the ON_WINDOW_CLOSE emit 
> strategy during KStream windowed aggregations opens a key-value iterator but 
> does not call `close()` on it 
> ([link|https://github.com/apache/kafka/blob/5afedd9ac37c4d740f47867cfd31eaed15dc542f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractKStreamTimeWindowAggregateProcessor.java#L203]),
>  despite the Javadocs for the iterator making clear that users must do so in 
> order to release resources 
> ([link|https://github.com/apache/kafka/blob/5afedd9ac37c4d740f47867cfd31eaed15dc542f/streams/src/main/java/org/apache/kafka/streams/state/KeyValueIterator.java#L27]).
>   
> I discovered this bug while running load testing benchmarks and noticed that 
> some runs were sporadically hitting OOMs, so it is definitely possible to hit 
> this in practice.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mjsax opened a new pull request, #13498: MINOR: fix log statement

2023-04-03 Thread via GitHub


mjsax opened a new pull request, #13498:
URL: https://github.com/apache/kafka/pull/13498

   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #13470: KAFKA-14864: Close iterator in KStream windowed aggregation emit on window close

2023-04-03 Thread via GitHub


mjsax merged PR #13470:
URL: https://github.com/apache/kafka/pull/13470


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #13431: KAFKA-14491: [19/N] Combine versioned store RocksDB instances into one

2023-04-03 Thread via GitHub


mjsax merged PR #13431:
URL: https://github.com/apache/kafka/pull/13431


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #13364: KAFKA-14491: [16/N] Add recovery logic for store inconsistency due to failed write

2023-04-03 Thread via GitHub


mjsax merged PR #13364:
URL: https://github.com/apache/kafka/pull/13364


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #13454: MINOR: move RecordReader from org.apache.kafka.tools to org.apache.co…

2023-04-03 Thread via GitHub


ijuma commented on code in PR #13454:
URL: https://github.com/apache/kafka/pull/13454#discussion_r1156697756


##
clients/src/main/java/org/apache/kafka/common/RecordReader.java:
##
@@ -14,10 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.tools;
+package org.apache.kafka.common;

Review Comment:
   This would follow a similar pattern as the storage api classes, which seems 
reasonable to me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13451: KAFKA-14808: fix leaderless partition issue when controller removes u…

2023-04-03 Thread via GitHub


junrao commented on code in PR #13451:
URL: https://github.com/apache/kafka/pull/13451#discussion_r1156683028


##
core/src/main/scala/kafka/controller/KafkaController.scala:
##
@@ -802,8 +802,11 @@ class KafkaController(val config: KafkaConfig,
   // If there is a reassignment already in progress, then some of the 
currently adding replicas
   // may be eligible for immediate removal, in which case we need to stop 
the replicas.
   val unneededReplicas = 
currentAssignment.replicas.diff(reassignment.replicas)
-  if (unneededReplicas.nonEmpty)
+  if (unneededReplicas.nonEmpty) {
+// Elect a new leader if the current leader is the inside the unneeded 
replicas.
+moveReassignedPartitionLeaderIfRequired(topicPartition, reassignment)

Review Comment:
   Thanks for identifying this issue. Great find.
   
   Fixing this completely may be a bit tricky. The first issue is that 
`moveReassignedPartitionLeaderIfRequired` only selects the new leader from and 
sends the new leaderAndIsr request to the target replicas. We probably need 
both to be based on target + original replicas since the reassignment is not 
done at this moment.
   
   A second potential issue is that we may not be able to move the leader since 
other replicas are not in ISR. In this case, I am wondering if it's better to 
reject the new reassignment instead. This is a much rarer case and could be 
addressed in a separate jira.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] songnon commented on pull request #13451: KAFKA-14808: fix leaderless partition issue when controller removes u…

2023-04-03 Thread via GitHub


songnon commented on PR #13451:
URL: https://github.com/apache/kafka/pull/13451#issuecomment-1495270760

   @junrao I couldn't find your comment. It's lost somehow. :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13473: KAFKA-14866:Remove controller module metrics when broker is shutting down

2023-04-03 Thread via GitHub


hudeqi commented on PR #13473:
URL: https://github.com/apache/kafka/pull/13473#issuecomment-1495258058

   And this pr. thx! @showuon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13471: KAFKA-14868:Remove some forgotten metrics when the replicaManager is closed

2023-04-03 Thread via GitHub


hudeqi commented on PR #13471:
URL: https://github.com/apache/kafka/pull/13471#issuecomment-1495257711

   Hi, could you help to review this pr? @showuon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia opened a new pull request, #13497: KAFKA-14834: [2/N] Test coverage for out-of-order data in joins

2023-04-03 Thread via GitHub


vcrfxia opened a new pull request, #13497:
URL: https://github.com/apache/kafka/pull/13497

   In preparation for updating DSL join processors to have updated semantics 
when versioned stores are used (cf 
[KIP-914](https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+Join+Processor+Semantics+for+Versioned+Stores)),
 this PR adds test coverage for out-of-order data in joins to the existing 
integration tests for stream-table joins and primary-key table-table joins. 
Follow-up PRs will build on top of this change by adding new tests for 
versioned stores, and the out-of-order data will produce different results in 
those settings.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia opened a new pull request, #13496: KAFKA-14834: [1/N] Add timestamped get to KTableValueGetter

2023-04-03 Thread via GitHub


vcrfxia opened a new pull request, #13496:
URL: https://github.com/apache/kafka/pull/13496

   (This PR is stacked on https://github.com/apache/kafka/pull/13442. Only the 
last commit needs to be reviewed separately.)
   
   In preparation for updating DSL processors to use versioned stores (cf 
[KIP-914](https://cwiki.apache.org/confluence/display/KAFKA/KIP-914%3A+Join+Processor+Semantics+for+Versioned+Stores)),
 this PR adds two new methods to KTableValueGetter: `isVersioned()` and 
`get(key, asOfTimestamp)` and updates all existing implementations accordingly.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-04-03 Thread via GitHub


guozhangwang commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1156551492


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");
+final int numCommitted = 
commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>());
+if (numCommitted == -1) {

Review Comment:
   Sorry I made a typo there, I meant that we move the `rebalanceInProgress = 
false` line, which is currently in `onPartitionsAssigned` -> 
`onRebalanceComplete` into `onAssignment`, right before the `createNewTasks` 
line. By doing so we still keep the checking, knowing that at the time when we 
reach `onAssignment` it means the rebalance is basically done, and the check 
inside `commitTasksAndMaybeUpdateCommittableOffsets` would not block us.
   
   We would still have the check inside 
`commitTasksAndMaybeUpdateCommittableOffsets` for other callers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kirktrue opened a new pull request, #13495: KAFKA-14274: Introduce FetchRequestManager to integrate fetch into new consumer threading refactor

2023-04-03 Thread via GitHub


kirktrue opened a new pull request, #13495:
URL: https://github.com/apache/kafka/pull/13495

   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on pull request #13374: KAFKA-14765 and KAFKA-14776: Support for SCRAM at bootstrap with integration tests

2023-04-03 Thread via GitHub


pprovenzano commented on PR #13374:
URL: https://github.com/apache/kafka/pull/13374#issuecomment-1495078769

   > I looked in `org.apache.kafka.clients.admin.ScramMechanism` and didn't see 
any message about the other ScramMechanism enum. So it seems like someone 
changing this code could still make a mistake and change one but not the other,.
   
   Fixed. I have a plan to merge them. It is a fair amount of code churn though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13463: KAFKA-14854: Refactor inter broker send thread to handle all interbroker requests on one thread

2023-04-03 Thread via GitHub


jolshan commented on code in PR #13463:
URL: https://github.com/apache/kafka/pull/13463#discussion_r1156508649


##
core/src/main/scala/kafka/common/InterBrokerSender.scala:
##
@@ -45,10 +45,15 @@ abstract class InterBrokerSendThread(
 
   private val unsentRequests = new UnsentRequests
 
-  def generateRequests(): Iterable[RequestAndCompletionHandler]
+  private val requestManagers = new ArrayList[InterBrokerRequestManager]()

Review Comment:
   Yeah. I think right now I just start the thread first and then add the 
managers. I'm ok with either approach.
   If we allow adding or removing, we could make some configurations dynamic in 
the future. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13463: KAFKA-14854: Refactor inter broker send thread to handle all interbroker requests on one thread

2023-04-03 Thread via GitHub


jolshan commented on code in PR #13463:
URL: https://github.com/apache/kafka/pull/13463#discussion_r1156509464


##
core/src/main/scala/kafka/common/InterBrokerSender.scala:
##
@@ -156,6 +163,16 @@ abstract class InterBrokerSendThread(
   def wakeup(): Unit = networkClient.wakeup()
 }
 
+abstract class InterBrokerRequestManager() {
+  
+  var interBrokerSender: InterBrokerSender = _

Review Comment:
   I'm removing this anyway in favor passing the sender when the manager is 
created.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13463: KAFKA-14854: Refactor inter broker send thread to handle all interbroker requests on one thread

2023-04-03 Thread via GitHub


jolshan commented on code in PR #13463:
URL: https://github.com/apache/kafka/pull/13463#discussion_r1156508649


##
core/src/main/scala/kafka/common/InterBrokerSender.scala:
##
@@ -45,10 +45,15 @@ abstract class InterBrokerSendThread(
 
   private val unsentRequests = new UnsentRequests
 
-  def generateRequests(): Iterable[RequestAndCompletionHandler]
+  private val requestManagers = new ArrayList[InterBrokerRequestManager]()

Review Comment:
   Yeah. I think I just start the thread first and then add the managers. I'm 
ok with either approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] artemlivshits commented on a diff in pull request #13463: KAFKA-14854: Refactor inter broker send thread to handle all interbroker requests on one thread

2023-04-03 Thread via GitHub


artemlivshits commented on code in PR #13463:
URL: https://github.com/apache/kafka/pull/13463#discussion_r1156501035


##
core/src/main/scala/kafka/common/InterBrokerSender.scala:
##
@@ -45,10 +45,15 @@ abstract class InterBrokerSendThread(
 
   private val unsentRequests = new UnsentRequests
 
-  def generateRequests(): Iterable[RequestAndCompletionHandler]
+  private val requestManagers = new ArrayList[InterBrokerRequestManager]()

Review Comment:
   The intent here is that requestManagers cannot be updated after the sender 
starts the thread, but I wonder if we actually honor this invariant in our 
bootstrap sequence.  At the very least we need to set a flag when the thread 
starts and throw an exception if an attempt to update requestManagers is made 
after the thread is started.  Or just add full synchronization around 
requestManagers access and that would allow to modify requestManagers after the 
sender thread started (or use ConcurrentLinkedQueue).



##
core/src/main/scala/kafka/common/InterBrokerSender.scala:
##
@@ -156,6 +163,16 @@ abstract class InterBrokerSendThread(
   def wakeup(): Unit = networkClient.wakeup()
 }
 
+abstract class InterBrokerRequestManager() {
+  
+  var interBrokerSender: InterBrokerSender = _

Review Comment:
   I don't think this can be updated and read concurrently -- all updates 
happen in the startup thread before the object is visible to request threads.  
We can add a comment to clarify.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #13374: KAFKA-14765 and KAFKA-14776: Support for SCRAM at bootstrap with integration tests

2023-04-03 Thread via GitHub


pprovenzano commented on code in PR #13374:
URL: https://github.com/apache/kafka/pull/13374#discussion_r1156501556


##
clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramMechanism.java:
##
@@ -40,11 +48,13 @@ public enum ScramMechanism {
 MECHANISMS_MAP = Collections.unmodifiableMap(map);
 }
 
-ScramMechanism(String hashAlgorithm, String macAlgorithm, int 
minIterations) {
+ScramMechanism(byte type, String hashAlgorithm, String macAlgorithm, int 
minIterations, int maxIterations) {

Review Comment:
   Fixed. Note that checkstyleMain sets java indentation here at 4 spaces not 2.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #13374: KAFKA-14765 and KAFKA-14776: Support for SCRAM at bootstrap with integration tests

2023-04-03 Thread via GitHub


pprovenzano commented on code in PR #13374:
URL: https://github.com/apache/kafka/pull/13374#discussion_r1156501112


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -235,9 +379,21 @@ object StorageTool extends Logging {
 metaProperties: MetaProperties,
 metadataVersion: MetadataVersion,
 ignoreFormatted: Boolean): Int = {
+val bootstrapMetadata = buildBootstrapMetadata(metadataVersion, None, 
"format command")
+formatCommand(stream, directories, metaProperties, bootstrapMetadata, 
metadataVersion, ignoreFormatted)
+  }
+
+
+  def formatCommand(stream: PrintStream,

Review Comment:
   Fixed, along with a couple others.
   What line length do we use before we need to split the line?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13454: MINOR: move RecordReader from org.apache.kafka.tools to org.apache.co…

2023-04-03 Thread via GitHub


junrao commented on code in PR #13454:
URL: https://github.com/apache/kafka/pull/13454#discussion_r1156500801


##
clients/src/main/java/org/apache/kafka/common/RecordReader.java:
##
@@ -14,10 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.tools;
+package org.apache.kafka.common;

Review Comment:
   Another possibility is to move this to the tools module under the package 
org.apache.kafka.tools.api, which seems a bit cleaner?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #13374: KAFKA-14765 and KAFKA-14776: Support for SCRAM at bootstrap with integration tests

2023-04-03 Thread via GitHub


pprovenzano commented on code in PR #13374:
URL: https://github.com/apache/kafka/pull/13374#discussion_r1156499455


##
core/src/test/java/kafka/testkit/KafkaClusterTestKit.java:
##
@@ -395,9 +395,7 @@ private void formatNodeAndLog(MetaProperties properties, 
String metadataLogDir,
 try (PrintStream out = new PrintStream(stream)) {
 StorageTool.formatCommand(out,
 
JavaConverters.asScalaBuffer(Collections.singletonList(metadataLogDir)).toSeq(),
-properties,
-MetadataVersion.MINIMUM_BOOTSTRAP_VERSION,
-false);
+properties, 
MetadataVersion.MINIMUM_BOOTSTRAP_VERSION, false);

Review Comment:
   Nope! Fixed!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13463: KAFKA-14854: Refactor inter broker send thread to handle all interbroker requests on one thread

2023-04-03 Thread via GitHub


jolshan commented on code in PR #13463:
URL: https://github.com/apache/kafka/pull/13463#discussion_r1156496683


##
core/src/main/scala/kafka/common/InterBrokerSender.scala:
##
@@ -156,6 +163,16 @@ abstract class InterBrokerSendThread(
   def wakeup(): Unit = networkClient.wakeup()
 }
 
+abstract class InterBrokerRequestManager() {
+  
+  var interBrokerSender: InterBrokerSender = _

Review Comment:
   Got it. If we go back to the argument passed in as a parameter, then we can 
just eliminate this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13463: KAFKA-14854: Refactor inter broker send thread to handle all interbroker requests on one thread

2023-04-03 Thread via GitHub


jolshan commented on code in PR #13463:
URL: https://github.com/apache/kafka/pull/13463#discussion_r1156496361


##
core/src/main/scala/kafka/common/InterBrokerSender.scala:
##
@@ -45,10 +45,15 @@ abstract class InterBrokerSendThread(
 
   private val unsentRequests = new UnsentRequests
 
-  def generateRequests(): Iterable[RequestAndCompletionHandler]
+  private val requestManagers = new ArrayList[InterBrokerRequestManager]()

Review Comment:
   Got it. Thanks for the explanation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13463: KAFKA-14854: Refactor inter broker send thread to handle all interbroker requests on one thread

2023-04-03 Thread via GitHub


jolshan commented on code in PR #13463:
URL: https://github.com/apache/kafka/pull/13463#discussion_r1156496118


##
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##
@@ -59,8 +61,9 @@ object TransactionCoordinator {
   time, metrics)
 
 val logContext = new LogContext(s"[TransactionCoordinator 
id=${config.brokerId}] ")
-val txnMarkerChannelManager = TransactionMarkerChannelManager(config, 
metrics, metadataCache, txnStateManager,
-  time, logContext)
+val txnMarkerChannelManager = new TransactionMarkerChannelManager(config, 
metadataCache, txnStateManager,
+  time)
+interBrokerSender.addRequestManager(txnMarkerChannelManager)

Review Comment:
   Ok. I will redo it with the argument passed in. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] artemlivshits commented on a diff in pull request #13463: KAFKA-14854: Refactor inter broker send thread to handle all interbroker requests on one thread

2023-04-03 Thread via GitHub


artemlivshits commented on code in PR #13463:
URL: https://github.com/apache/kafka/pull/13463#discussion_r1156488622


##
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##
@@ -59,8 +61,9 @@ object TransactionCoordinator {
   time, metrics)
 
 val logContext = new LogContext(s"[TransactionCoordinator 
id=${config.brokerId}] ")
-val txnMarkerChannelManager = TransactionMarkerChannelManager(config, 
metrics, metadataCache, txnStateManager,
-  time, logContext)
+val txnMarkerChannelManager = new TransactionMarkerChannelManager(config, 
metadataCache, txnStateManager,
+  time)
+interBrokerSender.addRequestManager(txnMarkerChannelManager)

Review Comment:
   Just to clarify -- prior to my suggestion the code both passed the sender as 
a constructor argument and also required adding the request, so I suggested to 
eliminate one of those, happened to be the constructor argument.  The other way 
around (just have a constructor argument, and eliminate adding request) sounds 
good to me as well, it's just doing both seems redundant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13269: KAFKA-12634 enforce checkpoint after restoration

2023-04-03 Thread via GitHub


philipnee commented on code in PR #13269:
URL: https://github.com/apache/kafka/pull/13269#discussion_r1156483833


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -2484,6 +2511,48 @@ public void 
shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
 );
 }
 
+@Test
+public void shouldCheckpointAfterRestorationWhenAtLeastOnceEnabled() {
+final ProcessorStateManager processorStateManager = mockStateManager();
+recordCollector = mock(RecordCollectorImpl.class);
+doReturn(singletonMap(partition1, 1L)).when(recordCollector).offsets();
+
+task = createStatefulTask(createConfig(AT_LEAST_ONCE, "100"), true, 
processorStateManager);
+task.initializeIfNeeded();
+task.completeRestoration(noOpResetter -> { });
+verify(processorStateManager, times(1)).checkpoint();
+verify(recordCollector, times(1)).offsets();
+}
+
+@Test
+public void shouldNotCheckpointAfterRestorationWhenExactlyOnceEnabled() {
+final ProcessorStateManager processorStateManager = mockStateManager();
+recordCollector = mock(RecordCollectorImpl.class);
+
+task = createStatefulTask(createConfig(EXACTLY_ONCE_V2, "100"), true, 
processorStateManager);
+task.initializeIfNeeded();
+task.completeRestoration(noOpResetter -> { });
+verify(processorStateManager, never()).checkpoint();
+verify(processorStateManager, never()).changelogOffsets();
+verify(recordCollector, never()).offsets();
+}
+
+private ProcessorStateManager mockStateManager() {
+final ProcessorStateManager manager = spy(new ProcessorStateManager(

Review Comment:
   I'm doing a partial mock here, to make sure the correct methods are being 
invoked.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13269: KAFKA-12634 enforce checkpoint after restoration

2023-04-03 Thread via GitHub


philipnee commented on code in PR #13269:
URL: https://github.com/apache/kafka/pull/13269#discussion_r1156483107


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -44,6 +44,7 @@
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyConfig;

Review Comment:
   The IDE moved this, but it seems to be in an alphabetical sequence now. Can 
we keep it here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on pull request #13269: KAFKA-12634 enforce checkpoint after restoration

2023-04-03 Thread via GitHub


philipnee commented on PR #13269:
URL: https://github.com/apache/kafka/pull/13269#issuecomment-1495021777

   Hey @cadonna - Again, thanks for the reviews; sorry for the auto-cleanup for 
my IDE, I've cleaned up most of the unnecessary changes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao merged pull request #13480: MINOR: Fix typos in `security.html`

2023-04-03 Thread via GitHub


junrao merged PR #13480:
URL: https://github.com/apache/kafka/pull/13480


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13463: KAFKA-14854: Refactor inter broker send thread to handle all interbroker requests on one thread

2023-04-03 Thread via GitHub


junrao commented on code in PR #13463:
URL: https://github.com/apache/kafka/pull/13463#discussion_r1156467464


##
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##
@@ -59,8 +61,9 @@ object TransactionCoordinator {
   time, metrics)
 
 val logContext = new LogContext(s"[TransactionCoordinator 
id=${config.brokerId}] ")
-val txnMarkerChannelManager = TransactionMarkerChannelManager(config, 
metrics, metadataCache, txnStateManager,
-  time, logContext)
+val txnMarkerChannelManager = new TransactionMarkerChannelManager(config, 
metadataCache, txnStateManager,
+  time)
+interBrokerSender.addRequestManager(txnMarkerChannelManager)

Review Comment:
   Yes, it's a minor thing. We could leave this as it is for now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13463: KAFKA-14854: Refactor inter broker send thread to handle all interbroker requests on one thread

2023-04-03 Thread via GitHub


junrao commented on code in PR #13463:
URL: https://github.com/apache/kafka/pull/13463#discussion_r1156467080


##
core/src/main/scala/kafka/common/InterBrokerSender.scala:
##
@@ -156,6 +163,16 @@ abstract class InterBrokerSendThread(
   def wakeup(): Unit = networkClient.wakeup()
 }
 
+abstract class InterBrokerRequestManager() {
+  
+  var interBrokerSender: InterBrokerSender = _

Review Comment:
   The issue is that `interBrokerSender` is being set and read in different 
threads. If there is no memory barrier, there is no guarantee that the reader 
will see the latest value being set. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on a diff in pull request #13463: KAFKA-14854: Refactor inter broker send thread to handle all interbroker requests on one thread

2023-04-03 Thread via GitHub


junrao commented on code in PR #13463:
URL: https://github.com/apache/kafka/pull/13463#discussion_r1156466725


##
core/src/main/scala/kafka/common/InterBrokerSender.scala:
##
@@ -45,10 +45,15 @@ abstract class InterBrokerSendThread(
 
   private val unsentRequests = new UnsentRequests
 
-  def generateRequests(): Iterable[RequestAndCompletionHandler]
+  private val requestManagers = new ArrayList[InterBrokerRequestManager]()

Review Comment:
   Regarding synchronization, it seems that it's possible for `requestManagers` 
to be iterated through `drainGeneratedRequests()` while it's being modified 
through `addRequestManager()` concurrently. In that case, ArrayList could throw 
ConcurrentModificationException.
   
   Regarding visibility, since there is no memory barrier for 
`requestManagers`, if one thread changes `requestManagers`, there is no 
guarantee that another thread will see the latest value inside 
`requestManagers`.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13374: KAFKA-14765 and KAFKA-14776: Support for SCRAM at bootstrap with integration tests

2023-04-03 Thread via GitHub


cmccabe commented on code in PR #13374:
URL: https://github.com/apache/kafka/pull/13374#discussion_r1156401794


##
core/src/test/java/kafka/testkit/KafkaClusterTestKit.java:
##
@@ -395,9 +395,7 @@ private void formatNodeAndLog(MetaProperties properties, 
String metadataLogDir,
 try (PrintStream out = new PrintStream(stream)) {
 StorageTool.formatCommand(out,
 
JavaConverters.asScalaBuffer(Collections.singletonList(metadataLogDir)).toSeq(),
-properties,
-MetadataVersion.MINIMUM_BOOTSTRAP_VERSION,
-false);
+properties, 
MetadataVersion.MINIMUM_BOOTSTRAP_VERSION, false);

Review Comment:
   do we have to do this whitespace change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13374: KAFKA-14765 and KAFKA-14776: Support for SCRAM at bootstrap with integration tests

2023-04-03 Thread via GitHub


cmccabe commented on code in PR #13374:
URL: https://github.com/apache/kafka/pull/13374#discussion_r1156401424


##
core/src/main/scala/kafka/tools/StorageTool.scala:
##
@@ -235,9 +379,21 @@ object StorageTool extends Logging {
 metaProperties: MetaProperties,
 metadataVersion: MetadataVersion,
 ignoreFormatted: Boolean): Int = {
+val bootstrapMetadata = buildBootstrapMetadata(metadataVersion, None, 
"format command")
+formatCommand(stream, directories, metaProperties, bootstrapMetadata, 
metadataVersion, ignoreFormatted)
+  }
+
+
+  def formatCommand(stream: PrintStream,

Review Comment:
   can you use standard formatting here
   ```
   Foo(
 bar
 baz
 quux
   )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on a diff in pull request #13374: KAFKA-14765 and KAFKA-14776: Support for SCRAM at bootstrap with integration tests

2023-04-03 Thread via GitHub


cmccabe commented on code in PR #13374:
URL: https://github.com/apache/kafka/pull/13374#discussion_r1156400641


##
clients/src/main/java/org/apache/kafka/common/security/scram/internals/ScramMechanism.java:
##
@@ -40,11 +48,13 @@ public enum ScramMechanism {
 MECHANISMS_MAP = Collections.unmodifiableMap(map);
 }
 
-ScramMechanism(String hashAlgorithm, String macAlgorithm, int 
minIterations) {
+ScramMechanism(byte type, String hashAlgorithm, String macAlgorithm, int 
minIterations, int maxIterations) {

Review Comment:
   can you use standard indentation here
   ```
   Foo(
 bar
 baz
 quux
   ) {
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #13374: KAFKA-14765 and KAFKA-14776: Support for SCRAM at bootstrap with integration tests

2023-04-03 Thread via GitHub


cmccabe commented on PR #13374:
URL: https://github.com/apache/kafka/pull/13374#issuecomment-1494895163

   I looked in `org.apache.kafka.clients.admin.ScramMechanism` and didn't see 
any message about the other ScramMechanism enum. So it seems like someone 
changing this code could still make a mistake and change one but not the other,.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-6007) Connect can't validate against transforms in plugins.path

2023-04-03 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris resolved KAFKA-6007.

  Assignee: Greg Harris  (was: Konstantine Karantasis)
Resolution: Fixed

> Connect can't validate against transforms in plugins.path
> -
>
> Key: KAFKA-6007
> URL: https://issues.apache.org/jira/browse/KAFKA-6007
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0
>Reporter: Stephane Maarek
>Assignee: Greg Harris
>Priority: Major
> Fix For: 1.0.3, 1.1.2, 2.0.2, 2.1.2, 2.3.2, 2.5.0, 2.4.0, 2.2.2
>
>
> Kafka Connect can't validate a custom transformation if placed in plugins 
> path.
> Here's the output I get on the validate call:
> {code:java}
> Invalid value com.mycorp.kafka.transforms.impl.FlattenSinkRecord for 
> configuration transforms.Flat.type: Class 
> com.mycorp.kafka.transforms.impl.FlattenSinkRecord could not be found.
> Invalid value null for configuration transforms.Flat.type: Not a 
> Transformation
> "recommended_values": [   
> "com.mycorp.kafka.transforms.Flatten$Key",
> "com.mycorp.kafka.transforms.Flatten$Value",
> "com.mycorp.kafka.transforms.impl.FlattenSinkRecord",
> "org.apache.kafka.connect.transforms.Cast$Key",
> "org.apache.kafka.connect.transforms.Cast$Value",
> "org.apache.kafka.connect.transforms.ExtractField$Key",
> "org.apache.kafka.connect.transforms.ExtractField$Value",
> "org.apache.kafka.connect.transforms.Flatten$Key",
> "org.apache.kafka.connect.transforms.Flatten$Value",
> "org.apache.kafka.connect.transforms.HoistField$Key",
> "org.apache.kafka.connect.transforms.HoistField$Value",
> "org.apache.kafka.connect.transforms.InsertField$Key",
> "org.apache.kafka.connect.transforms.InsertField$Value",
> "org.apache.kafka.connect.transforms.MaskField$Key",
> "org.apache.kafka.connect.transforms.MaskField$Value",
> "org.apache.kafka.connect.transforms.RegexRouter",
> "org.apache.kafka.connect.transforms.ReplaceField$Key",
> "org.apache.kafka.connect.transforms.ReplaceField$Value",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
> "org.apache.kafka.connect.transforms.TimestampConverter$Key",
> "org.apache.kafka.connect.transforms.TimestampConverter$Value",
> "org.apache.kafka.connect.transforms.TimestampRouter",
> "org.apache.kafka.connect.transforms.ValueToKey"],
> {code}
> As you can see the class appear in the recommended values (!) but can't be 
> picked up on the validate call. 
> I believe it's because the recommender implements class discovery using 
> plugins:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L194
> But the class inference itself doesn't:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L199
> (I'm not an expert in class loading though, just a guess... Unsure how to fix)
> A quick fix is to add the transformations in the ClassPath itself, but that 
> defeats the point a bit. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-6007) Connect can't validate against transforms in plugins.path

2023-04-03 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-6007:
---
Fix Version/s: 1.0.3
   1.1.2
   2.0.2
   2.1.2
   2.3.2
   2.5.0
   2.4.0
   2.2.2

> Connect can't validate against transforms in plugins.path
> -
>
> Key: KAFKA-6007
> URL: https://issues.apache.org/jira/browse/KAFKA-6007
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0
>Reporter: Stephane Maarek
>Assignee: Konstantine Karantasis
>Priority: Major
> Fix For: 1.0.3, 1.1.2, 2.0.2, 2.1.2, 2.2.2, 2.4.0, 2.5.0, 2.3.2
>
>
> Kafka Connect can't validate a custom transformation if placed in plugins 
> path.
> Here's the output I get on the validate call:
> {code:java}
> Invalid value com.mycorp.kafka.transforms.impl.FlattenSinkRecord for 
> configuration transforms.Flat.type: Class 
> com.mycorp.kafka.transforms.impl.FlattenSinkRecord could not be found.
> Invalid value null for configuration transforms.Flat.type: Not a 
> Transformation
> "recommended_values": [   
> "com.mycorp.kafka.transforms.Flatten$Key",
> "com.mycorp.kafka.transforms.Flatten$Value",
> "com.mycorp.kafka.transforms.impl.FlattenSinkRecord",
> "org.apache.kafka.connect.transforms.Cast$Key",
> "org.apache.kafka.connect.transforms.Cast$Value",
> "org.apache.kafka.connect.transforms.ExtractField$Key",
> "org.apache.kafka.connect.transforms.ExtractField$Value",
> "org.apache.kafka.connect.transforms.Flatten$Key",
> "org.apache.kafka.connect.transforms.Flatten$Value",
> "org.apache.kafka.connect.transforms.HoistField$Key",
> "org.apache.kafka.connect.transforms.HoistField$Value",
> "org.apache.kafka.connect.transforms.InsertField$Key",
> "org.apache.kafka.connect.transforms.InsertField$Value",
> "org.apache.kafka.connect.transforms.MaskField$Key",
> "org.apache.kafka.connect.transforms.MaskField$Value",
> "org.apache.kafka.connect.transforms.RegexRouter",
> "org.apache.kafka.connect.transforms.ReplaceField$Key",
> "org.apache.kafka.connect.transforms.ReplaceField$Value",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
> "org.apache.kafka.connect.transforms.TimestampConverter$Key",
> "org.apache.kafka.connect.transforms.TimestampConverter$Value",
> "org.apache.kafka.connect.transforms.TimestampRouter",
> "org.apache.kafka.connect.transforms.ValueToKey"],
> {code}
> As you can see the class appear in the recommended values (!) but can't be 
> picked up on the validate call. 
> I believe it's because the recommender implements class discovery using 
> plugins:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L194
> But the class inference itself doesn't:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L199
> (I'm not an expert in class loading though, just a guess... Unsure how to fix)
> A quick fix is to add the transformations in the ClassPath itself, but that 
> defeats the point a bit. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-6007) Connect can't validate against transforms in plugins.path

2023-04-03 Thread Greg Harris (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17708085#comment-17708085
 ] 

Greg Harris commented on KAFKA-6007:


I believe that this failure mode was addressed in 
[https://github.com/apache/kafka/pull/7315] by using the DelegatingClassLoader 
when instantiating the ConnectorConfig instead of the parent loader for the 
classpath. I'll resolve this with the same fix versions as KAFKA-8340.

> Connect can't validate against transforms in plugins.path
> -
>
> Key: KAFKA-6007
> URL: https://issues.apache.org/jira/browse/KAFKA-6007
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0
>Reporter: Stephane Maarek
>Assignee: Konstantine Karantasis
>Priority: Major
>
> Kafka Connect can't validate a custom transformation if placed in plugins 
> path.
> Here's the output I get on the validate call:
> {code:java}
> Invalid value com.mycorp.kafka.transforms.impl.FlattenSinkRecord for 
> configuration transforms.Flat.type: Class 
> com.mycorp.kafka.transforms.impl.FlattenSinkRecord could not be found.
> Invalid value null for configuration transforms.Flat.type: Not a 
> Transformation
> "recommended_values": [   
> "com.mycorp.kafka.transforms.Flatten$Key",
> "com.mycorp.kafka.transforms.Flatten$Value",
> "com.mycorp.kafka.transforms.impl.FlattenSinkRecord",
> "org.apache.kafka.connect.transforms.Cast$Key",
> "org.apache.kafka.connect.transforms.Cast$Value",
> "org.apache.kafka.connect.transforms.ExtractField$Key",
> "org.apache.kafka.connect.transforms.ExtractField$Value",
> "org.apache.kafka.connect.transforms.Flatten$Key",
> "org.apache.kafka.connect.transforms.Flatten$Value",
> "org.apache.kafka.connect.transforms.HoistField$Key",
> "org.apache.kafka.connect.transforms.HoistField$Value",
> "org.apache.kafka.connect.transforms.InsertField$Key",
> "org.apache.kafka.connect.transforms.InsertField$Value",
> "org.apache.kafka.connect.transforms.MaskField$Key",
> "org.apache.kafka.connect.transforms.MaskField$Value",
> "org.apache.kafka.connect.transforms.RegexRouter",
> "org.apache.kafka.connect.transforms.ReplaceField$Key",
> "org.apache.kafka.connect.transforms.ReplaceField$Value",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Key",
> "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
> "org.apache.kafka.connect.transforms.TimestampConverter$Key",
> "org.apache.kafka.connect.transforms.TimestampConverter$Value",
> "org.apache.kafka.connect.transforms.TimestampRouter",
> "org.apache.kafka.connect.transforms.ValueToKey"],
> {code}
> As you can see the class appear in the recommended values (!) but can't be 
> picked up on the validate call. 
> I believe it's because the recommender implements class discovery using 
> plugins:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L194
> But the class inference itself doesn't:
> https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L199
> (I'm not an expert in class loading though, just a guess... Unsure how to fix)
> A quick fix is to add the transformations in the ClassPath itself, but that 
> defeats the point a bit. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rondagostino commented on pull request #13280: KAFKA-14735: Improve KRaft metadata image change performance at high …

2023-04-03 Thread via GitHub


rondagostino commented on PR #13280:
URL: https://github.com/apache/kafka/pull/13280#issuecomment-1494811199

   The persistent collection library space remain unsettled (frustrating that 
it would remain this way even after so many years). I therefore added wrapper 
classes and import controls so that we don't end up sprinkling library-specific 
code all over the place.  This will help insulate us from changes in this space 
over time.
   
   Given that the `PColllections` benchmarks are a massive improvement over the 
existing code, we can just go with `PCollections` here since that is what is 
needed in https://github.com/apache/kafka/pull/13437.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dorwi commented on pull request #13494: MINOR: update the batch size connect task metrics documentation

2023-04-03 Thread via GitHub


dorwi commented on PR #13494:
URL: https://github.com/apache/kafka/pull/13494#issuecomment-1494781625

   It was not clear in the documentation whether the size referred to the batch 
size in bytes or it was the number of records. Changed the wording as it's done 
for example in transactionSizeMax.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14237) Kafka TLS Doesn't Present Intermediary Certificates when using PEM

2023-04-03 Thread Manikumar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17708071#comment-17708071
 ] 

Manikumar commented on KAFKA-14237:
---

[~sophokles73] Thanks for your interest. You can take a look at KIP and 
implementation:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-651+-+Support+PEM+format+for+SSL+certificates+and+private+key
[https://github.com/apache/kafka/pull/9345/files]

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/ssl/DefaultSslEngineFactory.java#L273

> Kafka TLS Doesn't Present Intermediary Certificates when using PEM
> --
>
> Key: KAFKA-14237
> URL: https://issues.apache.org/jira/browse/KAFKA-14237
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 3.2.1
> Environment: Deployed using the Bitnami Helm 
> Chart(https://github.com/bitnami/charts/tree/master/bitnami/kafka)
> The Bitnami Helm Chart uses Docker Image: 
> https://github.com/bitnami/containers/tree/main/bitnami/kafka
> An issue was already opened with Bitnami and they told us to send this 
> upstream: https://github.com/bitnami/containers/issues/6654
>Reporter: Ryan R
>Priority: Blocker
>
> When using PEM TLS certificates, Kafka does not present the entire 
> certificate chain.
>  
> Our {{/opt/bitnami/kafka/config/server.properties}} file looks like this:
> {code:java}
> ssl.keystore.type=PEM
> ssl.truststore.type=PEM
> ssl.keystore.key=-BEGIN PRIVATE KEY- \
> 
> -END PRIVATE KEY-
> ssl.keystore.certificate.chain=-BEGIN CERTIFICATE- \
> 
> -END CERTIFICATE- \
> -BEGIN CERTIFICATE- \
> MIIFFjCCAv6gAwIBAgIRAJErCErPDBinU/bWLiWnX1owDQYJKoZIhvcNAQELBQAw \
> TzELMAkGA1UEBhMCVVMxKTAnBgNVBAoTIEludGVybmV0IFNlY3VyaXR5IFJlc2Vh \
> cmNoIEdyb3VwMRUwEwYDVQQDEwxJU1JHIFJvb3QgWDEwHhcNMjAwOTA0MDAwMDAw \
> WhcNMjUwOTE1MTYwMDAwWjAyMQswCQYDVQQGEwJVUzEWMBQGA1UEChMNTGV0J3Mg \
> RW5jcnlwdDELMAkGA1UEAxMCUjMwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK \
> AoIBAQC7AhUozPaglNMPEuyNVZLD+ILxmaZ6QoinXSaqtSu5xUyxr45r+XXIo9cP \
> R5QUVTVXjJ6oojkZ9YI8QqlObvU7wy7bjcCwXPNZOOftz2nwWgsbvsCUJCWH+jdx \
> sxPnHKzhm+/b5DtFUkWWqcFTzjTIUu61ru2P3mBw4qVUq7ZtDpelQDRrK9O8Zutm \
> NHz6a4uPVymZ+DAXXbpyb/uBxa3Shlg9F8fnCbvxK/eG3MHacV3URuPMrSXBiLxg \
> Z3Vms/EY96Jc5lP/Ooi2R6X/ExjqmAl3P51T+c8B5fWmcBcUr2Ok/5mzk53cU6cG \
> /kiFHaFpriV1uxPMUgP17VGhi9sVAgMBAAGjggEIMIIBBDAOBgNVHQ8BAf8EBAMC \
> AYYwHQYDVR0lBBYwFAYIKwYBBQUHAwIGCCsGAQUFBwMBMBIGA1UdEwEB/wQIMAYB \
> Af8CAQAwHQYDVR0OBBYEFBQusxe3WFbLrlAJQOYfr52LFMLGMB8GA1UdIwQYMBaA \
> FHm0WeZ7tuXkAXOACIjIGlj26ZtuMDIGCCsGAQUFBwEBBCYwJDAiBggrBgEFBQcw \
> AoYWaHR0cDovL3gxLmkubGVuY3Iub3JnLzAnBgNVHR8EIDAeMBygGqAYhhZodHRw \
> Oi8veDEuYy5sZW5jci5vcmcvMCIGA1UdIAQbMBkwCAYGZ4EMAQIBMA0GCysGAQQB \
> gt8TAQEBMA0GCSqGSIb3DQEBCwUAA4ICAQCFyk5HPqP3hUSFvNVneLKYY611TR6W \
> PTNlclQtgaDqw+34IL9fzLdwALduO/ZelN7kIJ+m74uyA+eitRY8kc607TkC53wl \
> ikfmZW4/RvTZ8M6UK+5UzhK8jCdLuMGYL6KvzXGRSgi3yLgjewQtCPkIVz6D2QQz \
> CkcheAmCJ8MqyJu5zlzyZMjAvnnAT45tRAxekrsu94sQ4egdRCnbWSDtY7kh+BIm \
> lJNXoB1lBMEKIq4QDUOXoRgffuDghje1WrG9ML+Hbisq/yFOGwXD9RiX8F6sw6W4 \
> avAuvDszue5L3sz85K+EC4Y/wFVDNvZo4TYXao6Z0f+lQKc0t8DQYzk1OXVu8rp2 \
> yJMC6alLbBfODALZvYH7n7do1AZls4I9d1P4jnkDrQoxB3UqQ9hVl3LEKQ73xF1O \
> yK5GhDDX8oVfGKF5u+decIsH4YaTw7mP3GFxJSqv3+0lUFJoi5Lc5da149p90Ids \
> hCExroL1+7mryIkXPeFM5TgO9r0rvZaBFOvV2z0gp35Z0+L4WPlbuEjN/lxPFin+ \
> HlUjr8gRsI3qfJOQFy/9rKIJR0Y/8Omwt/8oTWgy1mdeHmmjk7j1nYsvC9JSQ6Zv \
> MldlTTKB3zhThV1+XWYp6rjd5JW1zbVWEkLNxE7GJThEUG3szgBVGP7pSWTUTsqX \
> nLRbwHOoq7hHwg== \
> -END CERTIFICATE- \
> ssl.truststore.certificates=-BEGIN CERTIFICATE- \
> MIIFazCCA1OgAwIBAgIRAIIQz7DSQONZRGPgu2OCiwAwDQYJKoZIhvcNAQELBQAw \
> TzELMAkGA1UEBhMCVVMxKTAnBgNVBAoTIEludGVybmV0IFNlY3VyaXR5IFJlc2Vh \
> cmNoIEdyb3VwMRUwEwYDVQQDEwxJU1JHIFJvb3QgWDEwHhcNMTUwNjA0MTEwNDM4 \
> WhcNMzUwNjA0MTEwNDM4WjBPMQswCQYDVQQGEwJVUzEpMCcGA1UEChMgSW50ZXJu \
> ZXQgU2VjdXJpdHkgUmVzZWFyY2ggR3JvdXAxFTATBgNVBAMTDElTUkcgUm9vdCBY \
> MTCCAiIwDQYJKoZIhvcNAQEBBQADggIPADCCAgoCggIBAK3oJHP0FDfzm54rVygc \
> h77ct984kIxuPOZXoHj3dcKi/vVqbvYATyjb3miGbESTtrFj/RQSa78f0uoxmyF+ \
> 0TM8ukj13Xnfs7j/EvEhmkvBioZxaUpmZmyPfjxwv60pIgbz5MDmgK7iS4+3mX6U \
> A5/TR5d8mUgjU+g4rk8Kb4Mu0UlXjIB0ttov0DiNewNwIRt18jA8+o+u3dpjq+sW \
> T8KOEUt+zwvo/7V3LvSye0rgTBIlDHCNAymg4VMk7BPZ7hm/ELNKjD+Jo2FR3qyH \
> B5T0Y3HsLuJvW5iB4YlcNHlsdu87kGJ55tukmi8mxdAQ4Q7e2RCOFvu396j3x+UC \
> B5iPNgiV5+I3lg02dZ77DnKxHZu8A/lJBdiB3QW0KtZB6awBdpUKD9jf1b0SHzUv \
> KBds0pjBqAlkd25HN7rOrFleaJ1/ctaJxQZBKT5ZPt0m9STJEadao0xAH0ahmbWn \
> OlFuhjuefXKnEgV4We0+UXgVCwOPjdAvBbI+e0ocS3MFEvzG6uBQE3xDk3SzynTn \
> jh8BCNAw1FtxNrQHusEwMFxIt4I7mKZ9YIqioymCzLq9gwQbooMDQaHWBfEbwrbw \
> qHyGO0aoSCqI3Haadr8faqU9GY/rOPNk3sgrDQoo//fb4hVC1CLQJ13hef4Y53CI \
> 

[GitHub] [kafka] lucasbru commented on pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-04-03 Thread via GitHub


lucasbru commented on PR #11433:
URL: https://github.com/apache/kafka/pull/11433#issuecomment-1494744241

   > Regarding system tests, is there a way to trigger via Jenkins ? (Sorry I 
am not aware of this).
   
   
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/build?delay=0sec
 
   
   set `TEST_PATH` to `tests/kafkatest/tests/streams/` to only run streams 
tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-03 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1156255508


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignorTest.java:
##
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.assignor;
+
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+import java.util.Set;
+
+
+
+import java.util.*;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class ServerSideStickyRangeAssignorTest {
+
+private final ServerSideStickyRangeAssignor assignor = new 
ServerSideStickyRangeAssignor();
+
+private final String topic1Name = "topic1";
+private final Uuid topic1Uuid = Uuid.randomUuid();
+
+private final String topic2Name = "topic2";
+private final Uuid topic2Uuid = Uuid.randomUuid();
+
+private final String topic3Name = "topic3";
+private final Uuid topic3Uuid = Uuid.randomUuid();
+
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(topic1Name, 3));
+Map members = new HashMap<>();
+List subscribedTopics = new ArrayList<>();
+members.computeIfAbsent(consumerA, k -> new 
AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new 
HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.getMembers().isEmpty());
+}
+
+@Test
+public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() {
+// A -> T1, T3 // B -> T1, T3 // T1 -> 3 Partitions // T3 -> 2 
Partitions
+// Topics
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(topic1Name, 3));
+topics.put(topic3Uuid, new AssignmentTopicMetadata(topic3Name, 2));
+// Members
+Map members = new HashMap<>();
+// Consumer A
+List subscribedTopicsA = new 
ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid));
+members.put(consumerA, new AssignmentMemberSpec(Optional.empty(), 
Optional.empty(), subscribedTopicsA, new HashMap<>()));
+// Consumer B
+List subscribedTopicsB = new 
ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid));
+members.put(consumerB, new AssignmentMemberSpec(Optional.empty(), 
Optional.empty(), subscribedTopicsB, new HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
+
+Map>> expectedAssignment = new HashMap<>();
+// Topic 1 Partitions Assignment
+expectedAssignment.computeIfAbsent(topic1Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Arrays.asList(0, 1)));
+expectedAssignment.computeIfAbsent(topic1Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Collections.singletonList(2)));
+// Topic 3 Partitions Assignment
+expectedAssignment.computeIfAbsent(topic3Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Collections.singletonList(0)));
+expectedAssignment.computeIfAbsent(topic3Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Collections.singletonList(1)));
+
+assertAssignment(expectedAssignment, computedAssignment);
+}
+
+@Test
+public void 
testFirstAssignmentThreeConsumersThreeTopicsDifferentSubscriptions() {
+// A -> T1, T2 // B -> T3 // C -> T2, T3 // T1 -> 3 Partitions // T2 
-> 3 Partitions // T3 -> 2 Partitions
+// Topics
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(topic1Name, 3));
+topics.put(topic2Uuid, new AssignmentTopicMetadata(topic1Name, 3));
+topics.put(topic3Uuid, new AssignmentTopicMetadata(topic3Name, 2));
+// Members
+Map members = new HashMap<>();
+// 

[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-03 Thread via GitHub


jeffkbkim commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1156251873


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignorTest.java:
##
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.assignor;
+
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+import java.util.Set;
+
+
+
+import java.util.*;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class ServerSideStickyRangeAssignorTest {
+
+private final ServerSideStickyRangeAssignor assignor = new 
ServerSideStickyRangeAssignor();
+
+private final String topic1Name = "topic1";
+private final Uuid topic1Uuid = Uuid.randomUuid();
+
+private final String topic2Name = "topic2";
+private final Uuid topic2Uuid = Uuid.randomUuid();
+
+private final String topic3Name = "topic3";
+private final Uuid topic3Uuid = Uuid.randomUuid();
+
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(topic1Name, 3));
+Map members = new HashMap<>();
+List subscribedTopics = new ArrayList<>();
+members.computeIfAbsent(consumerA, k -> new 
AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new 
HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.getMembers().isEmpty());
+}
+
+@Test
+public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() {
+// A -> T1, T3 // B -> T1, T3 // T1 -> 3 Partitions // T3 -> 2 
Partitions
+// Topics
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(topic1Name, 3));
+topics.put(topic3Uuid, new AssignmentTopicMetadata(topic3Name, 2));
+// Members
+Map members = new HashMap<>();
+// Consumer A
+List subscribedTopicsA = new 
ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid));
+members.put(consumerA, new AssignmentMemberSpec(Optional.empty(), 
Optional.empty(), subscribedTopicsA, new HashMap<>()));
+// Consumer B
+List subscribedTopicsB = new 
ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid));
+members.put(consumerB, new AssignmentMemberSpec(Optional.empty(), 
Optional.empty(), subscribedTopicsB, new HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
+
+Map>> expectedAssignment = new HashMap<>();
+// Topic 1 Partitions Assignment
+expectedAssignment.computeIfAbsent(topic1Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Arrays.asList(0, 1)));
+expectedAssignment.computeIfAbsent(topic1Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Collections.singletonList(2)));
+// Topic 3 Partitions Assignment
+expectedAssignment.computeIfAbsent(topic3Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Collections.singletonList(0)));
+expectedAssignment.computeIfAbsent(topic3Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Collections.singletonList(1)));
+
+assertAssignment(expectedAssignment, computedAssignment);
+}
+
+@Test
+public void 
testFirstAssignmentThreeConsumersThreeTopicsDifferentSubscriptions() {
+// A -> T1, T2 // B -> T3 // C -> T2, T3 // T1 -> 3 Partitions // T2 
-> 3 Partitions // T3 -> 2 Partitions
+// Topics
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(topic1Name, 3));
+topics.put(topic2Uuid, new AssignmentTopicMetadata(topic1Name, 3));

Review Comment:
   no, i'm referring to `AssignmentTopicMetadata(topic1Name, 3)`



-- 
This is an automated message from the Apache Git Service.
To 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-03 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1156241130


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignorTest.java:
##
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.assignor;
+
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+import java.util.Set;
+
+
+
+import java.util.*;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class ServerSideStickyRangeAssignorTest {
+
+private final ServerSideStickyRangeAssignor assignor = new 
ServerSideStickyRangeAssignor();
+
+private final String topic1Name = "topic1";
+private final Uuid topic1Uuid = Uuid.randomUuid();
+
+private final String topic2Name = "topic2";
+private final Uuid topic2Uuid = Uuid.randomUuid();
+
+private final String topic3Name = "topic3";
+private final Uuid topic3Uuid = Uuid.randomUuid();
+
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(topic1Name, 3));
+Map members = new HashMap<>();
+List subscribedTopics = new ArrayList<>();
+members.computeIfAbsent(consumerA, k -> new 
AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new 
HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.getMembers().isEmpty());
+}
+
+@Test
+public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() {
+// A -> T1, T3 // B -> T1, T3 // T1 -> 3 Partitions // T3 -> 2 
Partitions
+// Topics
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(topic1Name, 3));
+topics.put(topic3Uuid, new AssignmentTopicMetadata(topic3Name, 2));
+// Members
+Map members = new HashMap<>();
+// Consumer A
+List subscribedTopicsA = new 
ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid));
+members.put(consumerA, new AssignmentMemberSpec(Optional.empty(), 
Optional.empty(), subscribedTopicsA, new HashMap<>()));
+// Consumer B
+List subscribedTopicsB = new 
ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid));
+members.put(consumerB, new AssignmentMemberSpec(Optional.empty(), 
Optional.empty(), subscribedTopicsB, new HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
+
+Map>> expectedAssignment = new HashMap<>();
+// Topic 1 Partitions Assignment
+expectedAssignment.computeIfAbsent(topic1Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Arrays.asList(0, 1)));
+expectedAssignment.computeIfAbsent(topic1Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Collections.singletonList(2)));
+// Topic 3 Partitions Assignment
+expectedAssignment.computeIfAbsent(topic3Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Collections.singletonList(0)));
+expectedAssignment.computeIfAbsent(topic3Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Collections.singletonList(1)));
+
+assertAssignment(expectedAssignment, computedAssignment);
+}
+
+@Test
+public void 
testFirstAssignmentThreeConsumersThreeTopicsDifferentSubscriptions() {
+// A -> T1, T2 // B -> T3 // C -> T2, T3 // T1 -> 3 Partitions // T2 
-> 3 Partitions // T3 -> 2 Partitions
+// Topics
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(topic1Name, 3));
+topics.put(topic2Uuid, new AssignmentTopicMetadata(topic1Name, 3));

Review Comment:
   No we want all the mappings to be with Uuid



-- 
This is an automated message from the Apache Git Service.
To respond to the 

[GitHub] [kafka] rreddy-22 commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-03 Thread via GitHub


rreddy-22 commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1156240340


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignor.java:
##
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * The Server Side Sticky Range Assignor inherits properties of both the 
range assignor and the sticky assignor.
+ * Properties are :-
+ * 
+ *  1) Each consumer must get at least one partition per topic that it is 
subscribed to whenever the number of consumers is
+ *less than or equal to the number of partitions for that topic. (Range) 

+ *  2) Partitions should be assigned to consumers in a way that 
facilitates join operations where required. (Range) 
+ *This can only be done if the topics are co-partitioned in the first place
+ *Co-partitioned:-
+ *Two streams are co-partitioned if the following conditions are met:-
+ * ->The keys must have the same schemas
+ * ->The topics involved must have the same number of partitions
+ *  3) Consumers should retain as much as their previous assignment as 
possible. (Sticky) 
+ * 
+ * 
+ *
+ * The algorithm works mainly in 5 steps described below
+ * 
+ *  1) Get a map of the consumersPerTopic created using the member 
subscriptions.
+ *  2) Get a list of consumers (potentiallyUnfilled) that have not met the 
minimum required quota for assignment AND
+ * get a list of sticky partitions that we want to retain in the new 
assignment.
+ *  3) Add consumers from potentiallyUnfilled to Unfilled if they haven't 
met the total required quota = minQuota + (if necessary) extraPartition 
+ *  4) Get a list of available partitions by calculating the difference 
between total partitions and assigned sticky partitions 
+ *  5) Iterate through unfilled consumers and assign partitions from 
available partitions 
+ * 
+ * 
+ *
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.HashMap;
+import java.util.Collections;
+
+import static java.lang.Math.min;
+
+public class ServerSideStickyRangeAssignor implements PartitionAssignor {
+
+public static final String RANGE_ASSIGNOR_NAME = "range-sticky";
+
+@Override
+public String name() {
+return RANGE_ASSIGNOR_NAME;
+}
+
+protected static  void putList(Map> map, K key, V value) {
+List list = map.computeIfAbsent(key, k -> new ArrayList<>());
+list.add(value);
+}
+
+protected static  void putSet(Map> map, K key, V value) {
+Set set = map.computeIfAbsent(key, k -> new HashSet<>());
+set.add(value);
+}
+
+static class Pair {
+private final T first;
+private final U second;
+
+public Pair(T first, U second) {
+this.first = first;
+this.second = second;
+}
+
+public T getFirst() {
+return first;
+}
+
+public U getSecond() {
+return second;
+}
+
+@Override
+public String toString() {
+return "(" + first + ", " + second + ")";
+}
+}
+
+// Returns a map of the list of consumers per Topic (keyed by topicId)
+private Map> consumersPerTopic(AssignmentSpec 
assignmentSpec) {

Review Comment:
   this is a method though so private is implicitly final right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on pull request #13466: MINOR: Fix base ConfigDef in AbstractHerder::connectorPluginConfig

2023-04-03 Thread via GitHub


gharris1727 commented on PR #13466:
URL: https://github.com/apache/kafka/pull/13466#issuecomment-1494694048

   @mimaison can you merge and backport this, as Chris is offline?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13463: KAFKA-14854: Refactor inter broker send thread to handle all interbroker requests on one thread

2023-04-03 Thread via GitHub


jolshan commented on code in PR #13463:
URL: https://github.com/apache/kafka/pull/13463#discussion_r1156237705


##
core/src/main/scala/kafka/common/InterBrokerSender.scala:
##
@@ -45,10 +45,15 @@ abstract class InterBrokerSendThread(
 
   private val unsentRequests = new UnsentRequests
 
-  def generateRequests(): Iterable[RequestAndCompletionHandler]
+  private val requestManagers = new ArrayList[InterBrokerRequestManager]()

Review Comment:
   I think that would be handled in the managers right? I'm not sure what other 
synchronization we have here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13463: KAFKA-14854: Refactor inter broker send thread to handle all interbroker requests on one thread

2023-04-03 Thread via GitHub


jolshan commented on code in PR #13463:
URL: https://github.com/apache/kafka/pull/13463#discussion_r1156236037


##
core/src/main/scala/kafka/common/InterBrokerSender.scala:
##
@@ -156,6 +163,16 @@ abstract class InterBrokerSendThread(
   def wakeup(): Unit = networkClient.wakeup()
 }
 
+abstract class InterBrokerRequestManager() {
+  
+  var interBrokerSender: InterBrokerSender = _

Review Comment:
   Do we expect multiple places to set this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-04-03 Thread via GitHub


vamossagar12 commented on PR #11433:
URL: https://github.com/apache/kafka/pull/11433#issuecomment-1494691660

   @guozhangwang , thanks for another round of review. Regarding system tests, 
is there a way to trigger via Jenkins ? (Sorry I am not aware of this).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jolshan commented on a diff in pull request #13463: KAFKA-14854: Refactor inter broker send thread to handle all interbroker requests on one thread

2023-04-03 Thread via GitHub


jolshan commented on code in PR #13463:
URL: https://github.com/apache/kafka/pull/13463#discussion_r1156235480


##
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala:
##
@@ -59,8 +61,9 @@ object TransactionCoordinator {
   time, metrics)
 
 val logContext = new LogContext(s"[TransactionCoordinator 
id=${config.brokerId}] ")
-val txnMarkerChannelManager = TransactionMarkerChannelManager(config, 
metrics, metadataCache, txnStateManager,
-  time, logContext)
+val txnMarkerChannelManager = new TransactionMarkerChannelManager(config, 
metadataCache, txnStateManager,
+  time)
+interBrokerSender.addRequestManager(txnMarkerChannelManager)

Review Comment:
   I had this before and Artem said to change it as it is now.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-04-03 Thread via GitHub


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1156233120


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");
+final int numCommitted = 
commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>());
+if (numCommitted == -1) {
+log.info("Couldn't commit any tasks since a rebalance is in 
progress");
+} else {
+log.info("Committed {} transactions", numCommitted);

Review Comment:
   done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-04-03 Thread via GitHub


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1156232890


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");

Review Comment:
   Oh I see. I read through the ticket and got some idea. Thanks for the 
explanation. I have updated the code to pass allTasks,



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2023-04-03 Thread via GitHub


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r1156231874


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -342,7 +342,18 @@ public void handleAssignment(final Map> activeTasks,
 
 maybeThrowTaskExceptions(taskCloseExceptions);
 
-createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+final Collection newActiveTasks = 
createNewTasks(activeTasksToCreate, standbyTasksToCreate);
+// If there are any transactions in flight and there are newly created 
active tasks, commit the tasks
+// to avoid potential long restoration times.
+if (processingMode == EXACTLY_ONCE_V2 && 
threadProducer().transactionInFlight() && !newActiveTasks.isEmpty()) {
+log.info("New active tasks were added and there is an inflight 
transaction. Attempting to commit tasks.");
+final int numCommitted = 
commitTasksAndMaybeUpdateCommittableOffsets(newActiveTasks, new HashMap<>());
+if (numCommitted == -1) {

Review Comment:
   Thanks for this comment. IIUC, the suggestion is to move the 
`rebalanceInProgress == true` from 
`commitTasksAndMaybeUpdateCommittableOffsets` before creating new tasks and not 
create any new tasks or try to commit them if there's a rebalance in progress? 
   Also, does the check in `commitTasksAndMaybeUpdateCommittableOffsets` still 
remain? I see it is being called from a couple of other places as well like 
`handleCorruption` and `commit`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on pull request #13492: KAFKA-14752: Kafka examples improvements

2023-04-03 Thread via GitHub


fvaleri commented on PR #13492:
URL: https://github.com/apache/kafka/pull/13492#issuecomment-1494641695

   Hi @divijvaidya, I restored the original layout to make review easier.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13269: KAFKA-12634 enforce checkpoint after restoration

2023-04-03 Thread via GitHub


philipnee commented on code in PR #13269:
URL: https://github.com/apache/kafka/pull/13269#discussion_r1156197845


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -2484,6 +2505,34 @@ public void 
shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
 );
 }
 
+@Test
+public void testRestoration_CheckpointWrittenWhenEOSDisabled() {

Review Comment:
   Thanks, i'm adopting your suggestion here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13269: KAFKA-12634 enforce checkpoint after restoration

2023-04-03 Thread via GitHub


philipnee commented on code in PR #13269:
URL: https://github.com/apache/kafka/pull/13269#discussion_r1156196422


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -1692,43 +1699,45 @@ public void 
shouldReturnOffsetsForRepartitionTopicsForPurging() {
 final TopicPartition repartition = new TopicPartition("repartition", 
1);
 
 final ProcessorTopology topology = withRepartitionTopics(
-asList(source1, source2),
-mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), 
source2)),
-singleton(repartition.topic())
+asList(source1, source2),
+mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), 
source2)),
+singleton(repartition.topic())
 );
 consumer.assign(asList(partition1, repartition));
 consumer.updateBeginningOffsets(mkMap(mkEntry(repartition, 0L)));
 
-
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
+
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
 // restoration checkpoint
+
EasyMock.expect(stateManager.changelogPartitions()).andStubReturn(Collections.emptySet());
 
EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
 EasyMock.replay(stateManager, recordCollector);
 
 final StreamsConfig config = createConfig();
 final InternalProcessorContext context = new ProcessorContextImpl(
-taskId,
-config,
-stateManager,
-streamsMetrics,
-null
+taskId,
+config,
+stateManager,
+streamsMetrics,
+null
 );
 
 task = new StreamTask(
-taskId,
-mkSet(partition1, repartition),
-topology,
-consumer,
-new TopologyConfig(null,  config, new 
Properties()).getTaskConfig(),
-streamsMetrics,
-stateDirectory,
-cache,
-time,
-stateManager,
-recordCollector,
-context,
-logContext);
+taskId,
+mkSet(partition1, repartition),
+topology,
+consumer,
+new TopologyConfig(null, config, new 
Properties()).getTaskConfig(),
+streamsMetrics,
+stateDirectory,
+cache,
+time,
+stateManager,
+recordCollector,
+context,
+logContext);

Review Comment:
   Sorry! I think I'm in the habit of using 8 spaces for "continuous 
indentation".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] SpacRocket commented on a diff in pull request #13491: KAFKA-14722: Make BooleanSerde public (an addition of upgrade.html changes)

2023-04-03 Thread via GitHub


SpacRocket commented on code in PR #13491:
URL: https://github.com/apache/kafka/pull/13491#discussion_r1156179615


##
docs/upgrade.html:
##
@@ -19,6 +19,75 @@
 
 

[GitHub] [kafka] philipnee commented on a diff in pull request #13269: KAFKA-12634 enforce checkpoint after restoration

2023-04-03 Thread via GitHub


philipnee commented on code in PR #13269:
URL: https://github.com/apache/kafka/pull/13269#discussion_r1156165447


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -1692,43 +1699,45 @@ public void 
shouldReturnOffsetsForRepartitionTopicsForPurging() {
 final TopicPartition repartition = new TopicPartition("repartition", 
1);
 
 final ProcessorTopology topology = withRepartitionTopics(
-asList(source1, source2),
-mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), 
source2)),
-singleton(repartition.topic())
+asList(source1, source2),
+mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), 
source2)),
+singleton(repartition.topic())

Review Comment:
   Ah, thanks.  I think it's from auto formatting from the IDE.  I'll revert 
these indentation changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] philipnee commented on a diff in pull request #13269: KAFKA-12634 enforce checkpoint after restoration

2023-04-03 Thread via GitHub


philipnee commented on code in PR #13269:
URL: https://github.com/apache/kafka/pull/13269#discussion_r1156164789


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##
@@ -255,6 +255,9 @@ public void completeRestoration(final 
java.util.function.Consumer

[jira] [Resolved] (KAFKA-14871) Kafka Connect - TTL not respected for Config Provider-provided connector configurations

2023-04-03 Thread Dave Sloan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14871?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dave Sloan resolved KAFKA-14871.

Resolution: Abandoned

After discussing with my colleagues, we have come to the conclusion that 
although the behaviour is incorrect there's not actually a good reason for 
defining the secret providers inside the connector configuration.  For security 
reasons it is better to define inside the environment (worker properties).

> Kafka Connect - TTL not respected for Config Provider-provided connector 
> configurations
> ---
>
> Key: KAFKA-14871
> URL: https://issues.apache.org/jira/browse/KAFKA-14871
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.3.2
>Reporter: Dave Sloan
>Priority: Major
>
> When defining a configuration provider using environment variables (eg via 
> Docker), then a reload is scheduled according to the TTL of the returned 
> configuration.
>  
> Here is an example:
>  
> |Environment Variable|Value|
> |CONNECT_CONFIG_PROVIDERS_VAULT_PARAM_VAULT_ENGINE_VERSION|2|
> |CONNECT_CONFIG_PROVIDERS_VAULT_PARAM_VAULT_TOKEN|9c08104f-98b7-4bce-ab86-4a6c63897ec4|
> |CONNECT_CONFIG_PROVIDERS|vault|
> |CONNECT_CONFIG_PROVIDERS_VAULT_PARAM_DEFAULT_TTL|3|
> |CONNECT_CONFIG_PROVIDERS_VAULT_PARAM_VAULT_ADDR|http://vault:8200|
> |CONNECT_CONFIG_PROVIDERS_VAULT_PARAM_VAULT_AUTH_METHOD|token|
> |CONNECT_CONFIG_PROVIDERS_VAULT_PARAM_FILE_WRITE|false|
> |CONNECT_CONFIG_PROVIDERS_VAULT_CLASS|io.lenses.connect.secrets.providers.VaultSecretProvider|
>  
>  
> {code:java}
> {
>     "name": "testSink",
>     "config": {
>         "topics": "vaultTest",
>         "name": "testSink",
>         "key.converter": "org.apache.kafka.connect.storage.StringConverter",
>         "test.sink.secret.value": 
> "${vault:rotate-test/myVaultSecretPath:myVaultSecretKey}",
>         "value.converter": "org.apache.kafka.connect.storage.StringConverter",
>         "tasks.max": 1,
>     }
> }{code}
> And this is what we see in the logs:
> {code:java}
> STDOUT: [2023-03-30 07:46:43,204] INFO Scheduling a restart of connector 
> testSink in 4908 ms 
> (org.apache.kafka.connect.runtime.WorkerConfigTransformer){code}
> However when we try and achieve the same via connector properties, no restart 
> is scheduled
>  
>  
>  
> {code:java}
> {
>     "name": "testSink",
>     "config": {
>         "topics": "vaultTest",
>         "name": "testSink",
>         "key.converter": "org.apache.kafka.connect.storage.StringConverter",
>         "value.converter": "org.apache.kafka.connect.storage.StringConverter",
>         "tasks.max": 1,
>         "connector.class": "io.lenses.connect.secrets.test.TestSinkConnector",
>         "test.sink.secret.value": 
> "${vault:rotate-test/myVaultSecretPath:myVaultSecretKey}",
>         "config.providers": "vault",
>         "config.providers.vault.class": 
> "io.lenses.connect.secrets.providers.VaultSecretProvider",
>         "config.providers.vault.param.vault.engineversion": 2,
>         "config.providers.vault.param.vault.token": 
> "9c08104f-98b7-4bce-ab86-4a6c63897ec4",
>         "config.providers.vault.param.default.ttl": 3,
>         "config.providers.vault.param.vault.addr": "http://vault:8200;,
>         "config.providers.vault.param.vault.auth.method": "token",
>         "config.providers.vault.param.file.write": false
>     }
> }{code}
>  
>  
> Upon looking deeper into the code I can see that on line 239 of AbstractConfig
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java#L539]
>  
> {code:java}
> ConfigTransformerResult result = 
> configTransformer.transform(indirectVariables);{code}
> The result contains the TTLs.  However these are not used.
>  
> Expectation:
> TTLs should be used to schedule a restart of the connector so that the 
> behaviour is the same as if using environment properties.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14581) Move GetOffsetShell to tools

2023-04-03 Thread Ruslan Krivoshein (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14581?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17708011#comment-17708011
 ] 

Ruslan Krivoshein commented on KAFKA-14581:
---

Let it be booked for me, please

> Move GetOffsetShell to tools
> 
>
> Key: KAFKA-14581
> URL: https://issues.apache.org/jira/browse/KAFKA-14581
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13443: KAFKA-14514: Add Server Side Sticky Range Assignor (KIP-848)

2023-04-03 Thread via GitHub


jeffkbkim commented on code in PR #13443:
URL: https://github.com/apache/kafka/pull/13443#discussion_r1156090957


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/ServerSideStickyRangeAssignorTest.java:
##
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.group.assignor;
+
+
+import org.apache.kafka.common.Uuid;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+import java.util.Set;
+
+
+
+import java.util.*;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+public class ServerSideStickyRangeAssignorTest {
+
+private final ServerSideStickyRangeAssignor assignor = new 
ServerSideStickyRangeAssignor();
+
+private final String topic1Name = "topic1";
+private final Uuid topic1Uuid = Uuid.randomUuid();
+
+private final String topic2Name = "topic2";
+private final Uuid topic2Uuid = Uuid.randomUuid();
+
+private final String topic3Name = "topic3";
+private final Uuid topic3Uuid = Uuid.randomUuid();
+
+private final String consumerA = "A";
+private final String consumerB = "B";
+private final String consumerC = "C";
+
+@Test
+public void testOneConsumerNoTopic() {
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(topic1Name, 3));
+Map members = new HashMap<>();
+List subscribedTopics = new ArrayList<>();
+members.computeIfAbsent(consumerA, k -> new 
AssignmentMemberSpec(Optional.empty(), Optional.empty(), subscribedTopics, new 
HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment groupAssignment = assignor.assign(assignmentSpec);
+
+assertTrue(groupAssignment.getMembers().isEmpty());
+}
+
+@Test
+public void testFirstAssignmentTwoConsumersTwoTopicsSameSubscriptions() {
+// A -> T1, T3 // B -> T1, T3 // T1 -> 3 Partitions // T3 -> 2 
Partitions
+// Topics
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(topic1Name, 3));
+topics.put(topic3Uuid, new AssignmentTopicMetadata(topic3Name, 2));
+// Members
+Map members = new HashMap<>();
+// Consumer A
+List subscribedTopicsA = new 
ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid));
+members.put(consumerA, new AssignmentMemberSpec(Optional.empty(), 
Optional.empty(), subscribedTopicsA, new HashMap<>()));
+// Consumer B
+List subscribedTopicsB = new 
ArrayList<>(Arrays.asList(topic1Uuid, topic3Uuid));
+members.put(consumerB, new AssignmentMemberSpec(Optional.empty(), 
Optional.empty(), subscribedTopicsB, new HashMap<>()));
+
+AssignmentSpec assignmentSpec = new AssignmentSpec(members, topics);
+GroupAssignment computedAssignment = assignor.assign(assignmentSpec);
+
+Map>> expectedAssignment = new HashMap<>();
+// Topic 1 Partitions Assignment
+expectedAssignment.computeIfAbsent(topic1Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Arrays.asList(0, 1)));
+expectedAssignment.computeIfAbsent(topic1Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Collections.singletonList(2)));
+// Topic 3 Partitions Assignment
+expectedAssignment.computeIfAbsent(topic3Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Collections.singletonList(0)));
+expectedAssignment.computeIfAbsent(topic3Uuid, k -> new 
HashSet<>()).add(new HashSet<>(Collections.singletonList(1)));
+
+assertAssignment(expectedAssignment, computedAssignment);
+}
+
+@Test
+public void 
testFirstAssignmentThreeConsumersThreeTopicsDifferentSubscriptions() {
+// A -> T1, T2 // B -> T3 // C -> T2, T3 // T1 -> 3 Partitions // T2 
-> 3 Partitions // T3 -> 2 Partitions
+// Topics
+Map topics = new HashMap<>();
+topics.put(topic1Uuid, new AssignmentTopicMetadata(topic1Name, 3));
+topics.put(topic2Uuid, new AssignmentTopicMetadata(topic1Name, 3));

Review Comment:
   should this be topic2Name?



##

[GitHub] [kafka] dorwi opened a new pull request, #13494: MINOR: update the batch size connect task metrics documentation

2023-04-03 Thread via GitHub


dorwi opened a new pull request, #13494:
URL: https://github.com/apache/kafka/pull/13494

   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] jeffkbkim commented on a diff in pull request #13463: KAFKA-14854: Refactor inter broker send thread to handle all interbroker requests on one thread

2023-04-03 Thread via GitHub


jeffkbkim commented on code in PR #13463:
URL: https://github.com/apache/kafka/pull/13463#discussion_r1156037401


##
core/src/main/scala/kafka/common/InterBrokerSender.scala:
##
@@ -156,6 +163,16 @@ abstract class InterBrokerSendThread(
   def wakeup(): Unit = networkClient.wakeup()
 }
 
+abstract class InterBrokerRequestManager() {

Review Comment:
   can the parenthesis be removed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint

2023-04-03 Thread via GitHub


divijvaidya commented on code in PR #13456:
URL: https://github.com/apache/kafka/pull/13456#discussion_r1155911890


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -147,6 +152,16 @@ private List 
removeWhileMatching(Iterator

[GitHub] [kafka] Hangleton opened a new pull request, #13493: KAFKA-14852: Propagate Topic Ids to the Group Coordinator for Offset Fetch

2023-04-03 Thread via GitHub


Hangleton opened a new pull request, #13493:
URL: https://github.com/apache/kafka/pull/13493

   This task is the sibling of 
[PR-13378](https://github.com/apache/kafka/pull/13378) which propagates topic 
ids in the group coordinator on the offset commit (write) path. The purpose of 
this PR is to change the interfaces of the group coordinator and its adapter to 
propagate topic ids in a similar way on the offset fetch path.
   
   [KAFKA-14691](https://issues.apache.org/jira/browse/KAFKA-14691) will add 
the topic ids to the OffsetFetch API itself so that topic ids are propagated 
from clients to the coordinator on the offset fetch path. 
   
   Changes to the persisted data model (group metadata and keys) are out of 
scope.
   There is no functional change in this PR.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13487: KAFKA-9550 Copying log segments to tiered storage in RemoteLogManager

2023-04-03 Thread via GitHub


showuon commented on code in PR #13487:
URL: https://github.com/apache/kafka/pull/13487#discussion_r1155880909


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -359,6 +361,27 @@ public OptionalInt epochForOffset(long offset) {
 }
 }
 
+public LeaderEpochFileCache writeTo(LeaderEpochCheckpoint 
leaderEpochCheckpoint) {
+lock.readLock().lock();
+try {
+leaderEpochCheckpoint.write(epochEntries());
+return new LeaderEpochFileCache(topicPartition, 
leaderEpochCheckpoint);
+} finally {
+lock.readLock().unlock();
+}
+}
+
+public OptionalInt findPreviousEpoch(int epoch) {

Review Comment:
   Any reason we don't use the existing `previousEpoch` method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint

2023-04-03 Thread via GitHub


showuon commented on code in PR #13456:
URL: https://github.com/apache/kafka/pull/13456#discussion_r1155878999


##
storage/src/main/java/org/apache/kafka/storage/internals/checkpoint/InMemoryLeaderEpochCheckpoint.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.checkpoint;
+
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * This class stores a list of EpochEntry(LeaderEpoch + Offsets) to memory
+ */
+public class InMemoryLeaderEpochCheckpoint implements LeaderEpochCheckpoint {

Review Comment:
   >  I was wondering if it would be possible to copy the 
LeaderEpochCheckpointCache into an intermediate data structure (which doesn't 
have to be of type LeaderEpochCheckpointCache, hence decoupling it from the 
need to create a dummy InMemoryLeaderEpochCheckpoint ) and then using that 
intermediate data structure to extract the required Map (after whatever 
manipulation we want to do with it).
   
   That's a good idea. https://issues.apache.org/jira/browse/KAFKA-14877 is 
created for this improvement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14877) refactor

2023-04-03 Thread Luke Chen (Jira)
Luke Chen created KAFKA-14877:
-

 Summary: refactor 
 Key: KAFKA-14877
 URL: https://issues.apache.org/jira/browse/KAFKA-14877
 Project: Kafka
  Issue Type: Sub-task
Reporter: Luke Chen


follow up with this comment: 
https://github.com/apache/kafka/pull/13456#discussion_r1154306477



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14877) refactor InMemoryLeaderEpochCheckpoint

2023-04-03 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-14877:
--
Summary: refactor InMemoryLeaderEpochCheckpoint  (was: refactor )

> refactor InMemoryLeaderEpochCheckpoint
> --
>
> Key: KAFKA-14877
> URL: https://issues.apache.org/jira/browse/KAFKA-14877
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Luke Chen
>Priority: Major
>
> follow up with this comment: 
> https://github.com/apache/kafka/pull/13456#discussion_r1154306477



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] showuon commented on a diff in pull request #13456: KAFKA-14850: introduce InMemoryLeaderEpochCheckpoint

2023-04-03 Thread via GitHub


showuon commented on code in PR #13456:
URL: https://github.com/apache/kafka/pull/13456#discussion_r1155876324


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -383,21 +398,25 @@ public void clear() {
 
 // Visible for testing
 public List epochEntries() {
-lock.writeLock().lock();
+lock.readLock().lock();
 try {
 return new ArrayList<>(epochs.values());
 } finally {
-lock.writeLock().unlock();
+lock.readLock().unlock();
 }
 }
 
-private void flush() {
+private void flushTo(LeaderEpochCheckpoint leaderEpochCheckpoint, 
Collection epochEntries) {
 lock.readLock().lock();
 try {
-checkpoint.write(epochs.values());
+leaderEpochCheckpoint.write(epochEntries);
 } finally {
 lock.readLock().unlock();
 }
 }
 
+private void flush() {
+flushTo(this.checkpoint, epochs.values());

Review Comment:
   For the "removing" lock for `flushTo`, good point, removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hudeqi commented on pull request #13473: KAFKA-14866:Remove controller module metrics when broker is shutting down

2023-04-03 Thread via GitHub


hudeqi commented on PR #13473:
URL: https://github.com/apache/kafka/pull/13473#issuecomment-1494104389

   > Added a nit, otherwise code looks good to me. As a general suggestion for 
this and future PRs, it helps the reviewer if your description has the 
following sections:
   > 
   > ```
   > Motivation (explaining why this change is required)
   > Code change (what actually is being changed in the code)
   > Testing (how did you validate that the code changes are correct)
   > ```
   > 
   > This PR is ready for a committer to take a look at it.
   
   I see, thanks a lot!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13492: KAFKA-14752: Kafka examples improvements

2023-04-03 Thread via GitHub


divijvaidya commented on PR #13492:
URL: https://github.com/apache/kafka/pull/13492#issuecomment-1494096264

   May I suggest having different PRs for package change and the actual code 
changes? It would help view the diff of changes clearly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13269: KAFKA-12634 enforce checkpoint after restoration

2023-04-03 Thread via GitHub


cadonna commented on code in PR #13269:
URL: https://github.com/apache/kafka/pull/13269#discussion_r1155775071


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -1692,43 +1699,45 @@ public void 
shouldReturnOffsetsForRepartitionTopicsForPurging() {
 final TopicPartition repartition = new TopicPartition("repartition", 
1);
 
 final ProcessorTopology topology = withRepartitionTopics(
-asList(source1, source2),
-mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), 
source2)),
-singleton(repartition.topic())
+asList(source1, source2),
+mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), 
source2)),
+singleton(repartition.topic())
 );
 consumer.assign(asList(partition1, repartition));
 consumer.updateBeginningOffsets(mkMap(mkEntry(repartition, 0L)));
 
-
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
+
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
 // restoration checkpoint
+
EasyMock.expect(stateManager.changelogPartitions()).andStubReturn(Collections.emptySet());
 
EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
 EasyMock.replay(stateManager, recordCollector);
 
 final StreamsConfig config = createConfig();
 final InternalProcessorContext context = new ProcessorContextImpl(
-taskId,
-config,
-stateManager,
-streamsMetrics,
-null
+taskId,
+config,
+stateManager,
+streamsMetrics,
+null

Review Comment:
   Indentation of 4 is fine. Could you revert this change, please?



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -1546,8 +1551,9 @@ public void 
shouldNotCheckpointOffsetsAgainOnCommitIfSnapshotNotChangedMuch() {
 stateManager.checkpoint();
 EasyMock.expectLastCall().once();
 EasyMock.expect(stateManager.changelogOffsets())
-.andReturn(singletonMap(changelogPartition, 10L))
-.andReturn(singletonMap(changelogPartition, 20L));
+.andReturn(singletonMap(changelogPartition, 10L)) // 
restoration checkpoint
+.andReturn(singletonMap(changelogPartition, 10L))
+.andReturn(singletonMap(changelogPartition, 20L));

Review Comment:
   ```suggestion
   .andReturn(singletonMap(changelogPartition, 10L)) // restoration 
checkpoint
   .andReturn(singletonMap(changelogPartition, 10L))
   .andReturn(singletonMap(changelogPartition, 20L));
   ```



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -1692,43 +1699,45 @@ public void 
shouldReturnOffsetsForRepartitionTopicsForPurging() {
 final TopicPartition repartition = new TopicPartition("repartition", 
1);
 
 final ProcessorTopology topology = withRepartitionTopics(
-asList(source1, source2),
-mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), 
source2)),
-singleton(repartition.topic())
+asList(source1, source2),
+mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), 
source2)),
+singleton(repartition.topic())

Review Comment:
   Could you revert this change, please? An indentation of 4 is fine.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -2484,6 +2505,34 @@ public void 
shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
 );
 }
 
+@Test
+public void testRestoration_CheckpointWrittenWhenEOSDisabled() {

Review Comment:
   We usually use the form `should...` for test names. My proposal here is to 
use `shouldCheckpointAfterRestorationWhenAtLeastOnceEnabled`.



##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##
@@ -2484,6 +2505,34 @@ public void 
shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
 );
 }
 
+@Test
+public void testRestoration_CheckpointWrittenWhenEOSDisabled() {
+EasyMock.expect(stateManager.changelogOffsets())
+.andReturn(singletonMap(partition1, 0L)); // restoration 
checkpoint
+
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap());
+stateManager.checkpoint();
+EasyMock.expectLastCall().once(); // checkpoint should only be called 
once
+EasyMock.replay(stateManager, recordCollector);
+
+task = createStatefulTask(createConfig(AT_LEAST_ONCE, "100"), true);
+task.initializeIfNeeded();
+

[GitHub] [kafka] divijvaidya commented on a diff in pull request #13491: KAFKA-14722: Make BooleanSerde public (an addition of upgrade.html changes)

2023-04-03 Thread via GitHub


divijvaidya commented on code in PR #13491:
URL: https://github.com/apache/kafka/pull/13491#discussion_r1155795184


##
docs/upgrade.html:
##
@@ -19,6 +19,75 @@
 
 

[GitHub] [kafka] divijvaidya commented on a diff in pull request #13473: KAFKA-14866:Remove controller module metrics when broker is shutting down

2023-04-03 Thread via GitHub


divijvaidya commented on code in PR #13473:
URL: https://github.com/apache/kafka/pull/13473#discussion_r1155781010


##
core/src/test/scala/unit/kafka/controller/KafkaControllerTest.scala:
##
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.controller
+
+import kafka.server.metadata.ZkMetadataCache
+import kafka.server.{BrokerFeatures, DelegationTokenManager, KafkaConfig}
+import kafka.utils.TestUtils
+import kafka.zk.{BrokerInfo, KafkaZkClient}
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.server.metrics.KafkaMetricsGroup
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.mockito.ArgumentMatchers.{any, anyString}
+import org.mockito.Mockito.{mock, mockConstruction, times, verify, 
verifyNoMoreInteractions}
+
+class KafkaControllerTest {
+  var config: KafkaConfig = _
+
+  @BeforeEach
+  def setUp(): Unit = {
+val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
+config = KafkaConfig.fromProps(props)
+  }
+
+  @AfterEach
+  def tearDown(): Unit = {
+
+  }

Review Comment:
   nit
   
   This could perhaps be removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison merged pull request #13325: KAFKA-14771: Include threads info in ConcurrentModificationException message

2023-04-03 Thread via GitHub


mimaison merged PR #13325:
URL: https://github.com/apache/kafka/pull/13325


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pierDipi commented on a diff in pull request #13325: KAFKA-14771: Include threads info in ConcurrentModificationException message

2023-04-03 Thread via GitHub


pierDipi commented on code in PR #13325:
URL: https://github.com/apache/kafka/pull/13325#discussion_r1155728488


##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -2547,9 +2547,13 @@ private void acquireAndEnsureOpen() {
  * @throws ConcurrentModificationException if another thread already has 
the lock
  */
 private void acquire() {
-long threadId = Thread.currentThread().getId();
+final Thread thread = Thread.currentThread();
+final long threadId = thread.getId();
 if (threadId != currentThread.get() && 
!currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
-throw new ConcurrentModificationException("KafkaConsumer is not 
safe for multi-threaded access");
+throw new ConcurrentModificationException("KafkaConsumer is not 
safe for multi-threaded access. " +
+"currentThread(name: " + thread.getName() + ", id: " + 
thread.getId() + ")" +

Review Comment:
   ```suggestion
   "currentThread(name: " + thread.getName() + ", id: " + 
threadId + ")" +
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14746) Throwing in Connector.taskConfigs in distributed mode generates a lot of logs

2023-04-03 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17707886#comment-17707886
 ] 

Mickael Maison commented on KAFKA-14746:


To be honest I'm not sure if we should change this behavior or simply document 
it. I tend to agree that the retry logic was likely intended to handle failures 
communicating with the leader instead of exceptions from taskConfigs(). 

> Throwing in Connector.taskConfigs in distributed mode generates a lot of logs
> -
>
> Key: KAFKA-14746
> URL: https://issues.apache.org/jira/browse/KAFKA-14746
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Mickael Maison
>Priority: Major
>
> If a Connector throws in its taskConfigs() method, the runtime ends up 
> retrying using DistributedHerder.RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS which 
> is a fixed value (250ms). For each retry, the runtime prints the connector 
> configuration and the enriched configuration so this can quickly generate a 
> lot of logs.
> There is some value in throwing in taskConfigs() as it allows to fail fast in 
> case the connector is given bad credentials. For example this is what some of 
> the Debezium connectors do: 
> https://github.com/debezium/debezium/blob/main/debezium-connector-sqlserver/src/main/java/io/debezium/connector/sqlserver/SqlServerConnector.java#L56-L69
> The way Connectors are expected to work today is to instead always create 
> tasks and let each task fail in case the configuration is wrong. We should 
> document that and make it clear in the javadoc that throwing in taskConfigs 
> is not recommended.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] mimaison commented on a diff in pull request #13325: KAFKA-14771: Include threads info in ConcurrentModificationException message

2023-04-03 Thread via GitHub


mimaison commented on code in PR #13325:
URL: https://github.com/apache/kafka/pull/13325#discussion_r1155674207


##
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java:
##
@@ -2547,9 +2547,13 @@ private void acquireAndEnsureOpen() {
  * @throws ConcurrentModificationException if another thread already has 
the lock
  */
 private void acquire() {
-long threadId = Thread.currentThread().getId();
+final Thread thread = Thread.currentThread();
+final long threadId = thread.getId();
 if (threadId != currentThread.get() && 
!currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
-throw new ConcurrentModificationException("KafkaConsumer is not 
safe for multi-threaded access");
+throw new ConcurrentModificationException("KafkaConsumer is not 
safe for multi-threaded access. " +
+"currentThread(name: " + thread.getName() + ", id: " + 
thread.getId() + ")" +

Review Comment:
   Can we use `threadId` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-14450) Rack-aware partition assignment for consumers (KIP-881)

2023-04-03 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-14450.

Resolution: Fixed

> Rack-aware partition assignment for consumers (KIP-881)
> ---
>
> Key: KAFKA-14450
> URL: https://issues.apache.org/jira/browse/KAFKA-14450
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
>
> Top-level ticket for KIP-881 since we are splitting the PR into 3.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14452) Make sticky assignors rack-aware if consumer racks are configured.

2023-04-03 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14452?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-14452.

Fix Version/s: 3.5.0
 Reviewer: David Jacot
   Resolution: Fixed

> Make sticky assignors rack-aware if consumer racks are configured.
> --
>
> Key: KAFKA-14452
> URL: https://issues.apache.org/jira/browse/KAFKA-14452
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 3.5.0
>
>
> See KIP-881 for details



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rajinisivaram merged pull request #13350: KAFKA-14452: Make sticky assignors rack-aware if client rack is configured (KIP-881)

2023-04-03 Thread via GitHub


rajinisivaram merged PR #13350:
URL: https://github.com/apache/kafka/pull/13350


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #13350: KAFKA-14452: Make sticky assignors rack-aware if client rack is configured (KIP-881)

2023-04-03 Thread via GitHub


rajinisivaram commented on PR #13350:
URL: https://github.com/apache/kafka/pull/13350#issuecomment-1493889009

   @dajac Thanks for the reviews. Test failures not related, merging to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] emissionnebula commented on pull request #13437: KAFKA-14828: Remove R/W locks using persistent data structures

2023-04-03 Thread via GitHub


emissionnebula commented on PR #13437:
URL: https://github.com/apache/kafka/pull/13437#issuecomment-1493821132

   > Since you mentioned, "authorizer is quite read-heavy with very infrequent 
writes", and given the above risk highlighted above, would having slow writes 
with CopyOnWrite be an acceptable trade-off?
   
   There is another [PR](https://github.com/apache/kafka/pull/13280) by 
@rondagostino that evaluates these libraries for the Kraft metadata image 
change performance. That code currently uses the CopyOnWrite approach. The 
performance of these persistent libraries over existing code is multi-magnitude 
better - CopyOnWrite = ~31sec/op and PCollection = 509 ns/op. 
   
   > let's say we end up using a dependency that is maintained by a single 
person and they decide to stop maintaining it.
   
   I totally agree with this risk but the gain with these libraries is quite 
significant so maybe we can consider this also as a trade-off.
   
   > That leaves us with a choice to either overhaul our code base and remove 
all usages of that library (which I expect to be very intrusive for locking 
data structures) or take over ownership of something that we as a community are 
not experts in.
   
   We can probably minimize the risk of changing the code at all usage points 
by defining interfaces and wrapper classes in our code to standardize the usage 
of persistent collections. In the future, if we decide to move away from one 
library to another then it wouldn't require changing the code everywhere.
   
   cc - @ijuma @mimaison @cmccabe @omkreddy @dajac, what are your views on this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14868) Remove some forgotten metrics when the replicaManager is closed

2023-04-03 Thread hudeqi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17707796#comment-17707796
 ] 

hudeqi commented on KAFKA-14868:


The problem of not closing the metric in the replicaManager has been solved in 
the patch. As for how to improve the KafkaMetricsGroup to make the inspection 
of such issues more general, there is no conclusive solution.

> Remove some forgotten metrics when the replicaManager is closed
> ---
>
> Key: KAFKA-14868
> URL: https://issues.apache.org/jira/browse/KAFKA-14868
> Project: Kafka
>  Issue Type: Improvement
>Reporter: hudeqi
>Assignee: hudeqi
>Priority: Major
>
> In some current classes, some metrics are always registered when loading, but 
> it often occurs that some or all metrics are forgotten to be removed when 
> closing. For example, in the ReplicaManager class, IsrExpandsPerSec, 
> IsrShrinksPerSec, and FailedIsrUpdatesPerSec are all removed by omission.
> Therefore, increase the counting capability in KafkaMetricsGroup. When using 
> the method of KafkaMetricsGroup to add or remove a metric, it will add 1 to 
> the corresponding counter, so that you can check whether there is a forgotten 
> metric to be removed by counting and comparing in the unit test of the 
> corresponding class. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14868) Remove some forgotten metrics when the replicaManager is closed

2023-04-03 Thread hudeqi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hudeqi updated KAFKA-14868:
---
Summary: Remove some forgotten metrics when the replicaManager is closed  
(was: Add metric counting capability to KafkaMetricsGroup to facilitate 
checking that the metric is missed when it is closed)

> Remove some forgotten metrics when the replicaManager is closed
> ---
>
> Key: KAFKA-14868
> URL: https://issues.apache.org/jira/browse/KAFKA-14868
> Project: Kafka
>  Issue Type: Improvement
>Reporter: hudeqi
>Assignee: hudeqi
>Priority: Major
>
> In some current classes, some metrics are always registered when loading, but 
> it often occurs that some or all metrics are forgotten to be removed when 
> closing. For example, in the ReplicaManager class, IsrExpandsPerSec, 
> IsrShrinksPerSec, and FailedIsrUpdatesPerSec are all removed by omission.
> Therefore, increase the counting capability in KafkaMetricsGroup. When using 
> the method of KafkaMetricsGroup to add or remove a metric, it will add 1 to 
> the corresponding counter, so that you can check whether there is a forgotten 
> metric to be removed by counting and comparing in the unit test of the 
> corresponding class. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] fvaleri opened a new pull request, #13492: KAFKA-14752: Kafka examples improvements

2023-04-03 Thread via GitHub


fvaleri opened a new pull request, #13492:
URL: https://github.com/apache/kafka/pull/13492

   This is an attempt to improve the Kafka examples module.
   
   There is some heavy refactoring and minor fixes, but the original logic and 
script interface is kept. Below you can find the list of major changes and 
fixes:
   
   - Make scripts shellcheck compliant
   - Convert the readme to markdown as in other modules
   - Remove unnecessary dependency on server-common module
   - Move all classes from kafka.examples to org.apache.kafka.examples
   - Add missing and improve existing javadoc
   - Fix not enough replicas when default minISR>1
   - Add producer and consumer client id config
   - Improve log formatting and avoid printing all messages
   - Fix producer consumer driver hanging on timeout
   - Remove constructor logic and use try-with-resources
   - Improve EOS example and processor
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org