[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks

2021-03-18 Thread GitBox


ableegoldman commented on a change in pull request #10342:
URL: https://github.com/apache/kafka/pull/10342#discussion_r597424785



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -288,50 +277,31 @@ private String logPrefix() {
  * Get the lock for the {@link TaskId}s directory if it is available
  * @param taskId task id
  * @return true if successful
- * @throws IOException if the file cannot be created or file handle cannot 
be grabbed, should be considered as fatal
  */
-synchronized boolean lock(final TaskId taskId) throws IOException {
+synchronized boolean lock(final TaskId taskId) {
 if (!hasPersistentStores) {
 return true;
 }
 
-final File lockFile;
-// we already have the lock so bail out here
-final LockAndOwner lockAndOwner = locks.get(taskId);
-if (lockAndOwner != null && 
lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
-log.trace("{} Found cached state dir lock for task {}", 
logPrefix(), taskId);
+final String lockOwner = lockedTasksToStreamThreadOwner.get(taskId);
+if (lockOwner != null) {
+if (lockOwner.equals(Thread.currentThread().getName())) {

Review comment:
   Yeah I agree, I filed a ticket to improve the locking mechanism a little 
while back. Just wanted to keep this PR focused




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks

2021-03-18 Thread GitBox


rodesai commented on a change in pull request #10342:
URL: https://github.com/apache/kafka/pull/10342#discussion_r597413503



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -288,50 +277,31 @@ private String logPrefix() {
  * Get the lock for the {@link TaskId}s directory if it is available
  * @param taskId task id
  * @return true if successful
- * @throws IOException if the file cannot be created or file handle cannot 
be grabbed, should be considered as fatal
  */
-synchronized boolean lock(final TaskId taskId) throws IOException {
+synchronized boolean lock(final TaskId taskId) {
 if (!hasPersistentStores) {
 return true;
 }
 
-final File lockFile;
-// we already have the lock so bail out here
-final LockAndOwner lockAndOwner = locks.get(taskId);
-if (lockAndOwner != null && 
lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
-log.trace("{} Found cached state dir lock for task {}", 
logPrefix(), taskId);
+final String lockOwner = lockedTasksToStreamThreadOwner.get(taskId);
+if (lockOwner != null) {
+if (lockOwner.equals(Thread.currentThread().getName())) {

Review comment:
   never mind - I see this isn't a change from what was happening before. 
Still seems good to do, but not necessarily in this patch




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rodesai commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks

2021-03-18 Thread GitBox


rodesai commented on a change in pull request #10342:
URL: https://github.com/apache/kafka/pull/10342#discussion_r597413180



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -288,50 +277,31 @@ private String logPrefix() {
  * Get the lock for the {@link TaskId}s directory if it is available
  * @param taskId task id
  * @return true if successful
- * @throws IOException if the file cannot be created or file handle cannot 
be grabbed, should be considered as fatal
  */
-synchronized boolean lock(final TaskId taskId) throws IOException {
+synchronized boolean lock(final TaskId taskId) {
 if (!hasPersistentStores) {
 return true;
 }
 
-final File lockFile;
-// we already have the lock so bail out here
-final LockAndOwner lockAndOwner = locks.get(taskId);
-if (lockAndOwner != null && 
lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
-log.trace("{} Found cached state dir lock for task {}", 
logPrefix(), taskId);
+final String lockOwner = lockedTasksToStreamThreadOwner.get(taskId);
+if (lockOwner != null) {
+if (lockOwner.equals(Thread.currentThread().getName())) {

Review comment:
   seems dangerous to use the thread name to check ownership. might be 
safer to check reference equality on `Thread`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12508) Emit-on-change tables may lose updates on error or restart in at_least_once

2021-03-18 Thread Nico Habermann (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Habermann updated KAFKA-12508:
---
Description: 
[KIP-557|https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams]
 added emit-on-change semantics to KTables that suppress updates for duplicate 
values.

However, this may cause data loss in at_least_once topologies when records are 
retried from the last commit due to an error / restart / etc.

 

Consider the following example:
{code:java}
streams.table(source, materialized)
.toStream()
.map(mayThrow())
.to(output){code}
 
 # Record A gets read
 # Record A is stored in the table
 # The update for record A is forwarded through the topology
 # Map() throws (or alternatively, any restart while the forwarded update was 
still being processed and not yet produced to the output topic)
 # The stream is restarted and "retries" from the last commit
 # Record A gets read again
 # The table will discard the update for record A because
 ## The value is the same
 ## The timestamp is the same
 # Eventually the stream will commit
 # There is absolutely no output for Record A even though we're running in 
at_least_once

 

This behaviour does not seem intentional. [The emit-on-change logic explicitly 
forwards records that have the same value and an older 
timestamp.|https://github.com/apache/kafka/blob/367eca083b44261d4e5fa8aa61b7990a8b35f8b0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L50]
 This logic should probably be changed to also forward updates that have an 
older *or equal* timestamp.

  was:
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams]

Added emit-on-change semantics to KTables that suppress duplicate values.

However, this may cause data loss in at_least_once topologies when records are 
retried from the last commit due to an error / restart / etc.

 

Consider the following example:

 
{code:java}
streams.table(source, materialized)
.toStream()
.map(mayThrow())
.to(output){code}
 

 
 # Record A gets read
 # Record A is stored in the table
 # The update for record A is forwarded through the topology
 # Map() throws (or alternatively, any restart while the forwarded update was 
still being processed and not yet produced to the output topic)
 # The stream is restarted and "retries" from the last commit
 # Record A gets read again
 # The table will discard the update for record A because
 ## The value is the same
 ## The timestamp is the same
 # Eventually the stream will commit
 # There is absolutely no output for Record A even though we're running in 
at_least_once

 

This behaviour does not seem intentional. [The emit-on-change logic explicitly 
forwards records that have the same value and an older 
timestamp.|https://github.com/apache/kafka/blob/367eca083b44261d4e5fa8aa61b7990a8b35f8b0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L50]
 This logic should probably be changed to also forward updates that have an 
older *or equal* timestamp.


> Emit-on-change tables may lose updates on error or restart in at_least_once
> ---
>
> Key: KAFKA-12508
> URL: https://issues.apache.org/jira/browse/KAFKA-12508
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0, 2.6.1
>Reporter: Nico Habermann
>Priority: Major
>
> [KIP-557|https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams]
>  added emit-on-change semantics to KTables that suppress updates for 
> duplicate values.
> However, this may cause data loss in at_least_once topologies when records 
> are retried from the last commit due to an error / restart / etc.
>  
> Consider the following example:
> {code:java}
> streams.table(source, materialized)
> .toStream()
> .map(mayThrow())
> .to(output){code}
>  
>  # Record A gets read
>  # Record A is stored in the table
>  # The update for record A is forwarded through the topology
>  # Map() throws (or alternatively, any restart while the forwarded update was 
> still being processed and not yet produced to the output topic)
>  # The stream is restarted and "retries" from the last commit
>  # Record A gets read again
>  # The table will discard the update for record A because
>  ## The value is the same
>  ## The timestamp is the same
>  # Eventually the stream will commit
>  # There is absolutely no output for Record A even though we're running in 
> at_least_once
>  
> This behaviour does not seem intentional. [The emit-on-change logic 
> explicitly forwards records that have the same value and an older 
> 

[jira] [Created] (KAFKA-12508) Emit-on-change tables may lose updates on error or restart in at_least_once

2021-03-18 Thread Nico Habermann (Jira)
Nico Habermann created KAFKA-12508:
--

 Summary: Emit-on-change tables may lose updates on error or 
restart in at_least_once
 Key: KAFKA-12508
 URL: https://issues.apache.org/jira/browse/KAFKA-12508
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.6.1, 2.7.0
Reporter: Nico Habermann


[https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams]

Added emit-on-change semantics to KTables that suppress duplicate values.

However, this may cause data loss in at_least_once topologies when records are 
retried from the last commit due to an error / restart / etc.

 

Consider the following example:

 
{code:java}
streams.table(source, materialized)
.toStream()
.map(mayThrow())
.to(output){code}
 

 
 # Record A gets read
 # Record A is stored in the table
 # The update for record A is forwarded through the topology
 # Map() throws (or alternatively, any restart while the forwarded update was 
still being processed and not yet produced to the output topic)
 # The stream is restarted and "retries" from the last commit
 # Record A gets read again
 # The table will discard the update for record A because
 ## The value is the same
 ## The timestamp is the same
 # Eventually the stream will commit
 # There is absolutely no output for Record A even though we're running in 
at_least_once

 

This behaviour does not seem intentional. [The emit-on-change logic explicitly 
forwards records that have the same value and an older 
timestamp.|https://github.com/apache/kafka/blob/367eca083b44261d4e5fa8aa61b7990a8b35f8b0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L50]
 This logic should probably be changed to also forward updates that have an 
older *or equal* timestamp.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on pull request #10357: MINOR: KIP-500 Cluster ID is a String

2021-03-18 Thread GitBox


cmccabe commented on pull request #10357:
URL: https://github.com/apache/kafka/pull/10357#issuecomment-802556947


   @vvcephei : I like @hachikuji 's idea of merging this to 2.8.  It's worth 
noting that the protocol that we're changing here is a new one which was 
introduced by kip-500, and is not used in the ZK code path.  So the risk to 
existing code should be low.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks

2021-03-18 Thread GitBox


ableegoldman commented on a change in pull request #10342:
URL: https://github.com/apache/kafka/pull/10342#discussion_r596443720



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -186,7 +186,7 @@ public ProcessorStateManager(final TaskId taskId,
 this.changelogReader = changelogReader;
 this.sourcePartitions = sourcePartitions;
 
-this.baseDir = stateDirectory.directoryForTask(taskId);
+this.baseDir = stateDirectory.getOrCreateDirectoryForTask(taskId);

Review comment:
   Note: the only logical changes are in `StateDirectory` and 
`KafkaStreams`, the rest of the files were just touched by renaming this method 
to include the `getOrCreate` prefix. Sorry for the expanded surface area, I 
felt the renaming was merited since otherwise this critical functionality is 
easy to miss. 
   
   There are also some non-renaming changes in StateManagerUtil and TaskManager 
that just involve removing the try-catch logic for the no longer throwable 
IOException. For tests, you should focus on `StateDirectoryTest` and 
`StateManagerUtilTest`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10563) Make sure task directories don't remain locked by dead threads

2021-03-18 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304637#comment-17304637
 ] 

A. Sophie Blee-Goldman commented on KAFKA-10563:


I partially addressed this on the side in another 
PR:https://github.com/apache/kafka/pull/10342#issuecomment-802542776

We clean up any orphaned task directories when the cleaner thread runs. As 
mentioned above this is not perfect since it could mean this task directory 
remains blocked for up to 10 minutes (by default), which in the current 
architecture would also blocks progress on any other task assigned to that 
StreamThread. It's better than nothing but we should still follow up and make 
sure it's not possible to leave locked task directories behind

> Make sure task directories don't remain locked by dead threads
> --
>
> Key: KAFKA-10563
> URL: https://issues.apache.org/jira/browse/KAFKA-10563
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>
> Most common/expected exceptions within Streams are handled gracefully, and 
> the thread will make sure to clean up all resources such as task locks during 
> shutdown. However, there are some instances where an unexpected exception 
> such as an IllegalStateException can leave some resources orphaned.
> We have seen this happen to task directories after an IllegalStateException 
> is hit during the TaskManager's rebalance handling logic – the Thread shuts 
> down, but loses track of some tasks before unlocking them. This blocks any 
> further work on that task by any other thread in the same instance.
> Previously we decided that this was "ok" because an IllegalStateException 
> means all bets are off. But with the upcoming work of KIP-663 and KIP-671, 
> users will be able to react smartly on dying threads and replace them with 
> new ones, making it more important than ever to ensure that the application 
> can continue on with no lasting repercussions of a thread death. If we allow 
> users to revive/replace a thread that dies due to IllegalStateException, that 
> thread should not be blocked from doing any work by the ghost of its 
> predecessor. 
> It might be easiest to just add some logic to the cleanup thread to verify 
> all the existing locks against the list of live threads, and remove any 
> zombie locks. But we probably want to do this purging more frequently than 
> the cleanup thread runs (10min by default) – so maybe we can leverage the 
> work in KIP-671 and have each thread purge any locks still owned by it after 
> the uncaught exception handler runs, but before the thread dies.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #10342: KAFKA-12288: remove task-level filesystem locks

2021-03-18 Thread GitBox


ableegoldman commented on pull request #10342:
URL: https://github.com/apache/kafka/pull/10342#issuecomment-802542776


   @wcarlson5 I ended up putting a minor guard against orphaned task 
directories into this PR. It's not perfect but basically we just let the 
cleaner thread verify that any locked task directories belong to one of the 
currently running StreamThreads. Let me know what you think


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10355: KAFKA-12500: fix memory leak in thread cache

2021-03-18 Thread GitBox


ableegoldman commented on pull request #10355:
URL: https://github.com/apache/kafka/pull/10355#issuecomment-802535398


   Alright I added two integration tests, lmk if this looks good @vvcephei 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10275: KAFKA-12434; Admin support for `DescribeProducers` API

2021-03-18 Thread GitBox


chia7712 commented on a change in pull request #10275:
URL: https://github.com/apache/kafka/pull/10275#discussion_r597383123



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+public interface AdminApiHandler {
+
+/**
+ * Get a user-friendly name for the API this handler is implementing.
+ */
+String apiName();
+
+/**
+ * Initialize the set of keys required to handle this API and how the 
driver
+ * should map them to the broker that will handle the request for these 
keys.
+ *
+ * Two mapping types are supported:
+ *
+ * - Static mapping: when the brokerId is known ahead of time
+ * - Dynamic mapping: when the brokerId must be discovered dynamically
+ *
+ * @return the key mappings
+ */
+KeyMappings initializeKeys();
+
+/**
+ * Build the fulfillment request. The set of keys are derived during the 
Lookup stage
+ * as the set of keys which all map to the same destination broker.
+ *
+ * @param brokerId the target brokerId for the request
+ * @param keys the set of keys that should be handled by this request
+ *
+ * @return a builder for the request containing the given keys
+ */
+AbstractRequest.Builder buildRequest(Integer brokerId, Set keys);
+
+/**
+ * Callback that is invoked when a Fulfillment request returns 
successfully.
+ * The handler should parse the response, check for errors, and return a
+ * result which indicates which keys (if any) have either been completed or
+ * failed with an unrecoverable error.
+ *
+ * It is also possible that the response indicates an incorrect target 
brokerId
+ * (e.g. in the case of a NotLeader error when the request is bound for a 
partition
+ * leader). In this case the key will be "unmapped" from the target 
brokerId
+ * and lookup will be retried.
+ *
+ * Note that keys which received a retriable error should be left out of 
the
+ * result. They will be retried automatically.
+ *
+ * @param brokerId the brokerId that the associated request was sent to
+ * @param keys the set of keys from the associated request
+ * @param response the response received from the broker
+ *
+ * @return result indicating key complation, failure, and unmapping
+ */
+ApiResult handleResponse(Integer brokerId, Set keys, 
AbstractResponse response);
+
+class KeyMappings {
+public final Optional> staticMapping;

Review comment:
   Do you plan to add more fields to `StaticKeyMapping` and 
`DynamicKeyMapping` in the future? If not, it seems to me we can simplify the 
data structure.
   
   Empty staticMapping and staticMapping with empty map are identical in 
processing. Could we simplify this structure? for example, a pure map? 
   
   Also, How about renaming `keys` (of `DynamicKeyMapping`) to `dynamicKeys` 
and then moving both variables to `KeyMappings`?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java
##
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License 

[jira] [Commented] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor

2021-03-18 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304611#comment-17304611
 ] 

Luke Chen commented on KAFKA-12495:
---

[~ramkrish1489], I checked your PR in KAFKA-10413 again, and I modified my bug 
description now. You fixed the issue when member left, and you did a 
round-robin distribution to all the active members. Nice fix! 

The issue I tried to fix here, is when new member joined, we'll have 2 rounds 
of rebalance for it: 1 for revocation, 1 for re-assignment. And the issue is at 
the 2nd round of re-assignment. What we did now is re-assign all the previous 
revoked C/T to the new members, and end the rebalance. However, if there is 1 
(or more) members joined during the 1st round and 2nd round of rebalance, at 
the 2nd round of re-assignment, we'll assign all the revoked C/T to all new 
members, which should be only re-assigned to the "numberOfNewMember - 1" 
members.

 

You can check the example in the bug description again, you should understand 
what I meant. Thank you.

> Unbalanced connectors/tasks distribution will happen in Connect's incremental 
> cooperative assignor
> --
>
> Key: KAFKA-12495
> URL: https://issues.apache.org/jira/browse/KAFKA-12495
> Project: Kafka
>  Issue Type: Bug
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Attachments: image-2021-03-18-15-04-57-854.png, 
> image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png
>
>
> In Kafka Connect, we implement incremental cooperative rebalance algorithm 
> based on KIP-415 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
>  However, we have a bad assumption in the algorithm implementation, which is: 
> after revoking rebalance completed, the member(worker) count will be the same 
> as the previous round of reblance.
>  
> Let's take a look at the example in the KIP-415:
> !image-2021-03-18-15-07-27-103.png|width=441,height=556!
> It works well for most cases. But what if W4 added after 1st rebalance 
> completed and before 2nd rebalance started? Let's see what will happened? 
> Let's see this example: (we'll use 10 tasks here):
>  
> {code:java}
> Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
> BT5
> W1 is current leader
> W2 joins with assignment: []
> Rebalance is triggered
> W3 joins while rebalance is still active with assignment: []
> W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
> BT4, BT5]
> W1 becomes leader
> W1 computes and sends assignments:
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, 
> BT2, BT4, BT4, BT5])
> W2(delay: 0, assigned: [], revoked: [])
> W3(delay: 0, assigned: [], revoked: [])
> W1 stops revoked resources
> W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
> Rebalance is triggered
> W2 joins with assignment: []
> W3 joins with assignment: []
> // one more member joined
> W4 joins with assignment: []
> W1 becomes leader
> W1 computes and sends assignments:
> // We assigned all the previous revoked Connectors/Tasks to the new member, 
> which cause unbalanced distribution
> W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
> W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
> W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
> W2(delay: 0, assigned: [BT4, BT5], revoked: [])
> {code}
> We cannot just assign the revoked C/T to the new members in the 2nd round of 
> rebalance. We should check if we need one more round of revocation.
>  
> Note: The consumer's cooperative sticky assignor won't have this issue since 
> we re-compute the assignment in each round.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman edited a comment on pull request #10356: KAFKA-12503: cache size set by the thread

2021-03-18 Thread GitBox


ableegoldman edited a comment on pull request #10356:
URL: https://github.com/apache/kafka/pull/10356#issuecomment-802491798


   I don't think we should aim to catch things in soak, although I'm not on the 
side of "we should never catch anything in soak/everything must have an 
integration test" either. It's just that this particular issue, and this 
feature in general really, calls for an integration test -- we should be 
flexing the cache resize codepath in a realistic scenario. I'm happy to leave 
that as followup work as long as we do get around to it.
   
   This looks good and the build passed so I'll merge this to unblock the 
release


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor

2021-03-18 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-12495:
--
Description: 
In Kafka Connect, we implement incremental cooperative rebalance algorithm 
based on KIP-415 
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
 However, we have a bad assumption in the algorithm implementation, which is: 
after revoking rebalance completed, the member(worker) count will be the same 
as the previous round of reblance.

 

Let's take a look at the example in the KIP-415:

!image-2021-03-18-15-07-27-103.png|width=441,height=556!

It works well for most cases. But what if W4 added after 1st rebalance 
completed and before 2nd rebalance started? Let's see what will happened? Let's 
see this example: (we'll use 10 tasks here):

 
{code:java}
Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
BT5
W1 is current leader
W2 joins with assignment: []
Rebalance is triggered
W3 joins while rebalance is still active with assignment: []
W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
BT4, BT5]
W1 becomes leader
W1 computes and sends assignments:
W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
W2(delay: 0, assigned: [], revoked: [])
W3(delay: 0, assigned: [], revoked: [])
W1 stops revoked resources
W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
Rebalance is triggered
W2 joins with assignment: []
W3 joins with assignment: []

// one more member joined
W4 joins with assignment: []
W1 becomes leader
W1 computes and sends assignments:

// We assigned all the previous revoked Connectors/Tasks to the new member, 
which cause unbalanced distribution
W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: [])
W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: [])
W2(delay: 0, assigned: [BT4, BT5], revoked: [])
{code}
We cannot just assign the revoked C/T to the new members in the 2nd round of 
rebalance. We should check if we need one more round of revocation.

 

Note: The consumer's cooperative sticky assignor won't have this issue since we 
re-compute the assignment in each round.

  was:
In Kafka Connect, we implement incremental cooperative rebalance algorithm 
based on KIP-415 
([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect].
 However, we have a bad assumption in the algorithm implementation, which is: 
after revoking rebalance completed, the member(worker) count will be the same 
as the previous round of reblance.

 

Let's take a look at the example in the KIP-415:

!image-2021-03-18-15-07-27-103.png|width=441,height=556!

It works well for most cases. But what if W3 left after 1st rebalance completed 
and before 2nd rebalance started? Let's see what will happened? Let's see this 
example: (we'll use 10 tasks here):

 
{code:java}
Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, 
BT5
W1 is current leader
W2 joins with assignment: []
Rebalance is triggered
W3 joins while rebalance is still active with assignment: []
W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, 
BT4, BT5]
W1 becomes leader
W1 computes and sends assignments:
W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, 
BT4, BT4, BT5])
W2(delay: 0, assigned: [], revoked: [])
W3(delay: 0, assigned: [], revoked: [])
W1 stops revoked resources
W1 rejoins with assignment: [AC0, AT1, AT2, AT3]
Rebalance is triggered
W2 joins with assignment: []
// W3 is down
W3 doesn't join
W1 becomes leader
W1 computes and sends assignments:

// We assigned all the previous revoked Connectors/Tasks to the new member, 
which cause unbalanced distribution
W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [])
W2(delay: 0, assigned: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5], revoked: [])
{code}
We cannot assume the member count after keeps the same right after revocation.

 

Note: The consumer's cooperative sticky assignor won't have this issue since we 
re-compute the assignment in each round.


> Unbalanced connectors/tasks distribution will happen in Connect's incremental 
> cooperative assignor
> --
>
> Key: KAFKA-12495
> URL: 

[jira] [Comment Edited] (KAFKA-8608) Broker shows WARN on reassignment partitions on new brokers: Replica LEO, follower position & Cache truncation

2021-03-18 Thread Zhang Jianguo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304601#comment-17304601
 ] 

Zhang Jianguo edited comment on KAFKA-8608 at 3/19/21, 2:54 AM:


[~LillianY] [~xmar]

I meet the same issue.

[2021-03-18 17:37:30,799] WARN [ReplicaManager broker=15] Leader 15 failed to 
record follower 18's position 308399, and last sent HW since the replica is not 
recognized to be one of the assigned replicas 15,16 for partition 
Collect-gnodeb-24. Empty records will be returned for this partition. 
(kafka.server.ReplicaManager)

 

After controller switched from 14 to 15, it looks like kafka became abnormal. 
It didn't work even if I restart brokers.

*Logs of Broker 14*

!image-2021-03-19-10-36-04-328.png!

!image-2021-03-19-10-41-44-952.png!

  !image-2021-03-19-10-42-16-296.png!

 

!image-2021-03-19-10-42-32-759.png!

 

*producer LOG*

!image-2021-03-19-10-41-03-203.png!

 

*Consumer got timeout exception:*

*!image-2021-03-19-10-39-24-728.png!*

 


was (Author: alberyzjg):
[~LillianY]

I meet the same issue.

[2021-03-18 17:37:30,799] WARN [ReplicaManager broker=15] Leader 15 failed to 
record follower 18's position 308399, and last sent HW since the replica is not 
recognized to be one of the assigned replicas 15,16 for partition 
Collect-gnodeb-24. Empty records will be returned for this partition. 
(kafka.server.ReplicaManager)

 

After controller switched from 14 to 15, it looks like kafka became abnormal. 
It didn't work even if I restart brokers.

*Logs of Broker 14*

!image-2021-03-19-10-36-04-328.png!

!image-2021-03-19-10-41-44-952.png!

  !image-2021-03-19-10-42-16-296.png!

 

!image-2021-03-19-10-42-32-759.png!

 

*producer LOG*

!image-2021-03-19-10-41-03-203.png!

 

*Consumer got timeout exception:*

*!image-2021-03-19-10-39-24-728.png!*

 

> Broker shows WARN on reassignment partitions on new brokers: Replica LEO, 
> follower position & Cache truncation
> --
>
> Key: KAFKA-8608
> URL: https://issues.apache.org/jira/browse/KAFKA-8608
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.1.1
> Environment: Kafka 2.1.1
>Reporter: Di Campo
>Priority: Minor
>  Labels: broker, reassign, repartition
> Attachments: image-2021-03-19-10-36-04-328.png, 
> image-2021-03-19-10-39-24-728.png, image-2021-03-19-10-41-03-203.png, 
> image-2021-03-19-10-41-44-952.png, image-2021-03-19-10-42-16-296.png, 
> image-2021-03-19-10-42-32-759.png
>
>
> I added two brokers (brokerId 4,5) to a 3-node (brokerId 1,2,3) cluster where 
> there were 32 topics and 64 partitions on each, replication 3.
> Running reassigning partitions. 
> On each run, I can see the following WARN messages, but when the reassignment 
> partition process finishes, it all seems OK. ISR is OK (count is 3 in every 
> partition).
> But I get the following messages types, one per partition:
>  
> {code:java}
> [2019-06-27 12:42:03,946] WARN [LeaderEpochCache visitors-0.0.1-10] New epoch 
> entry EpochEntry(epoch=24, startOffset=51540) caused truncation of 
> conflicting entries ListBuffer(EpochEntry(epoch=22, startOffset=51540)). 
> Cache now contains 5 entries. (kafka.server.epoch.LeaderEpochFileCache) {code}
> -> This relates to cache, so I suppose it's pretty safe.
> {code:java}
> [2019-06-27 12:42:04,250] WARN [ReplicaManager broker=1] Leader 1 failed to 
> record follower 3's position 47981 since the replica is not recognized to be 
> one of the assigned replicas 1,2,5 for partition visitors-0.0.1-28. Empty 
> records will be returned for this partition. 
> (kafka.server.ReplicaManager){code}
> -> This is scary. I'm not sure about the severity of this, but it looks like 
> it may be missing records? 
> {code:java}
> [2019-06-27 12:42:03,709] WARN [ReplicaManager broker=1] While recording the 
> replica LEO, the partition visitors-0.0.1-58 hasn't been created. 
> (kafka.server.ReplicaManager){code}
> -> Here, these partitions are created. 
> First of all - am I supposed to be missing data here? I am assuming I don't, 
> and so far I don't see traces of losing anything.
> If so, I'm not sure what these messages are trying to say here. Should they 
> really be at WARN level? If so - should the message clarify better the 
> different risks involved? 
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8608) Broker shows WARN on reassignment partitions on new brokers: Replica LEO, follower position & Cache truncation

2021-03-18 Thread Zhang Jianguo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304601#comment-17304601
 ] 

Zhang Jianguo edited comment on KAFKA-8608 at 3/19/21, 2:42 AM:


[~LillianY]

I meet the same issue.

[2021-03-18 17:37:30,799] WARN [ReplicaManager broker=15] Leader 15 failed to 
record follower 18's position 308399, and last sent HW since the replica is not 
recognized to be one of the assigned replicas 15,16 for partition 
Collect-gnodeb-24. Empty records will be returned for this partition. 
(kafka.server.ReplicaManager)

 

After controller switched from 14 to 15, it looks like kafka became abnormal. 
It didn't work even if I restart brokers.

*Logs of Broker 14*

!image-2021-03-19-10-36-04-328.png!

!image-2021-03-19-10-41-44-952.png!

  !image-2021-03-19-10-42-16-296.png!

 

!image-2021-03-19-10-42-32-759.png!

 

*producer LOG*

!image-2021-03-19-10-41-03-203.png!

 

*Consumer got timeout exception:*

*!image-2021-03-19-10-39-24-728.png!*

 


was (Author: alberyzjg):
[~LillianY]

I meet the same issue.

[2021-03-18 17:37:30,799] WARN [ReplicaManager broker=15] Leader 15 failed to 
record follower 18's position 308399, and last sent HW since the replica is not 
recognized to be one of the assigned replicas 15,16 for partition 
Collect-gnodeb-24. Empty records will be returned for this partition. 
(kafka.server.ReplicaManager)

 

After controller switched from 14 to 15, it looks like kafka became abnormal. 
It didn't work even if I restart brokers.

!image-2021-03-19-10-36-04-328.png!

!image-2021-03-19-10-37-35-183.png!

 

!image-2021-03-19-10-38-11-280.png!

 

!image-2021-03-19-10-38-22-154.png!

 

*producer LOG*

*!image-2021-03-19-10-38-38-396.png!*

 

*Consumer got timeout exception:*

*!image-2021-03-19-10-39-24-728.png!*

 

> Broker shows WARN on reassignment partitions on new brokers: Replica LEO, 
> follower position & Cache truncation
> --
>
> Key: KAFKA-8608
> URL: https://issues.apache.org/jira/browse/KAFKA-8608
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.1.1
> Environment: Kafka 2.1.1
>Reporter: Di Campo
>Priority: Minor
>  Labels: broker, reassign, repartition
> Attachments: image-2021-03-19-10-36-04-328.png, 
> image-2021-03-19-10-39-24-728.png, image-2021-03-19-10-41-03-203.png, 
> image-2021-03-19-10-41-44-952.png, image-2021-03-19-10-42-16-296.png, 
> image-2021-03-19-10-42-32-759.png
>
>
> I added two brokers (brokerId 4,5) to a 3-node (brokerId 1,2,3) cluster where 
> there were 32 topics and 64 partitions on each, replication 3.
> Running reassigning partitions. 
> On each run, I can see the following WARN messages, but when the reassignment 
> partition process finishes, it all seems OK. ISR is OK (count is 3 in every 
> partition).
> But I get the following messages types, one per partition:
>  
> {code:java}
> [2019-06-27 12:42:03,946] WARN [LeaderEpochCache visitors-0.0.1-10] New epoch 
> entry EpochEntry(epoch=24, startOffset=51540) caused truncation of 
> conflicting entries ListBuffer(EpochEntry(epoch=22, startOffset=51540)). 
> Cache now contains 5 entries. (kafka.server.epoch.LeaderEpochFileCache) {code}
> -> This relates to cache, so I suppose it's pretty safe.
> {code:java}
> [2019-06-27 12:42:04,250] WARN [ReplicaManager broker=1] Leader 1 failed to 
> record follower 3's position 47981 since the replica is not recognized to be 
> one of the assigned replicas 1,2,5 for partition visitors-0.0.1-28. Empty 
> records will be returned for this partition. 
> (kafka.server.ReplicaManager){code}
> -> This is scary. I'm not sure about the severity of this, but it looks like 
> it may be missing records? 
> {code:java}
> [2019-06-27 12:42:03,709] WARN [ReplicaManager broker=1] While recording the 
> replica LEO, the partition visitors-0.0.1-58 hasn't been created. 
> (kafka.server.ReplicaManager){code}
> -> Here, these partitions are created. 
> First of all - am I supposed to be missing data here? I am assuming I don't, 
> and so far I don't see traces of losing anything.
> If so, I'm not sure what these messages are trying to say here. Should they 
> really be at WARN level? If so - should the message clarify better the 
> different risks involved? 
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8608) Broker shows WARN on reassignment partitions on new brokers: Replica LEO, follower position & Cache truncation

2021-03-18 Thread Zhang Jianguo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304601#comment-17304601
 ] 

Zhang Jianguo commented on KAFKA-8608:
--

[~LillianY]

I meet the same issue.

[2021-03-18 17:37:30,799] WARN [ReplicaManager broker=15] Leader 15 failed to 
record follower 18's position 308399, and last sent HW since the replica is not 
recognized to be one of the assigned replicas 15,16 for partition 
Collect-gnodeb-24. Empty records will be returned for this partition. 
(kafka.server.ReplicaManager)

 

After controller switched from 14 to 15, it looks like kafka became abnormal. 
It didn't work even if I restart brokers.

!image-2021-03-19-10-36-04-328.png!

!image-2021-03-19-10-37-35-183.png!

 

!image-2021-03-19-10-38-11-280.png!

 

!image-2021-03-19-10-38-22-154.png!

 

*producer LOG*

*!image-2021-03-19-10-38-38-396.png!*

 

*Consumer got timeout exception:*

*!image-2021-03-19-10-39-24-728.png!*

 

> Broker shows WARN on reassignment partitions on new brokers: Replica LEO, 
> follower position & Cache truncation
> --
>
> Key: KAFKA-8608
> URL: https://issues.apache.org/jira/browse/KAFKA-8608
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.1.1
> Environment: Kafka 2.1.1
>Reporter: Di Campo
>Priority: Minor
>  Labels: broker, reassign, repartition
> Attachments: image-2021-03-19-10-36-04-328.png, 
> image-2021-03-19-10-39-24-728.png
>
>
> I added two brokers (brokerId 4,5) to a 3-node (brokerId 1,2,3) cluster where 
> there were 32 topics and 64 partitions on each, replication 3.
> Running reassigning partitions. 
> On each run, I can see the following WARN messages, but when the reassignment 
> partition process finishes, it all seems OK. ISR is OK (count is 3 in every 
> partition).
> But I get the following messages types, one per partition:
>  
> {code:java}
> [2019-06-27 12:42:03,946] WARN [LeaderEpochCache visitors-0.0.1-10] New epoch 
> entry EpochEntry(epoch=24, startOffset=51540) caused truncation of 
> conflicting entries ListBuffer(EpochEntry(epoch=22, startOffset=51540)). 
> Cache now contains 5 entries. (kafka.server.epoch.LeaderEpochFileCache) {code}
> -> This relates to cache, so I suppose it's pretty safe.
> {code:java}
> [2019-06-27 12:42:04,250] WARN [ReplicaManager broker=1] Leader 1 failed to 
> record follower 3's position 47981 since the replica is not recognized to be 
> one of the assigned replicas 1,2,5 for partition visitors-0.0.1-28. Empty 
> records will be returned for this partition. 
> (kafka.server.ReplicaManager){code}
> -> This is scary. I'm not sure about the severity of this, but it looks like 
> it may be missing records? 
> {code:java}
> [2019-06-27 12:42:03,709] WARN [ReplicaManager broker=1] While recording the 
> replica LEO, the partition visitors-0.0.1-58 hasn't been created. 
> (kafka.server.ReplicaManager){code}
> -> Here, these partitions are created. 
> First of all - am I supposed to be missing data here? I am assuming I don't, 
> and so far I don't see traces of losing anything.
> If so, I'm not sure what these messages are trying to say here. Should they 
> really be at WARN level? If so - should the message clarify better the 
> different risks involved? 
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8608) Broker shows WARN on reassignment partitions on new brokers: Replica LEO, follower position & Cache truncation

2021-03-18 Thread Zhang Jianguo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhang Jianguo updated KAFKA-8608:
-
Attachment: image-2021-03-19-10-39-24-728.png

> Broker shows WARN on reassignment partitions on new brokers: Replica LEO, 
> follower position & Cache truncation
> --
>
> Key: KAFKA-8608
> URL: https://issues.apache.org/jira/browse/KAFKA-8608
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.1.1
> Environment: Kafka 2.1.1
>Reporter: Di Campo
>Priority: Minor
>  Labels: broker, reassign, repartition
> Attachments: image-2021-03-19-10-36-04-328.png, 
> image-2021-03-19-10-39-24-728.png
>
>
> I added two brokers (brokerId 4,5) to a 3-node (brokerId 1,2,3) cluster where 
> there were 32 topics and 64 partitions on each, replication 3.
> Running reassigning partitions. 
> On each run, I can see the following WARN messages, but when the reassignment 
> partition process finishes, it all seems OK. ISR is OK (count is 3 in every 
> partition).
> But I get the following messages types, one per partition:
>  
> {code:java}
> [2019-06-27 12:42:03,946] WARN [LeaderEpochCache visitors-0.0.1-10] New epoch 
> entry EpochEntry(epoch=24, startOffset=51540) caused truncation of 
> conflicting entries ListBuffer(EpochEntry(epoch=22, startOffset=51540)). 
> Cache now contains 5 entries. (kafka.server.epoch.LeaderEpochFileCache) {code}
> -> This relates to cache, so I suppose it's pretty safe.
> {code:java}
> [2019-06-27 12:42:04,250] WARN [ReplicaManager broker=1] Leader 1 failed to 
> record follower 3's position 47981 since the replica is not recognized to be 
> one of the assigned replicas 1,2,5 for partition visitors-0.0.1-28. Empty 
> records will be returned for this partition. 
> (kafka.server.ReplicaManager){code}
> -> This is scary. I'm not sure about the severity of this, but it looks like 
> it may be missing records? 
> {code:java}
> [2019-06-27 12:42:03,709] WARN [ReplicaManager broker=1] While recording the 
> replica LEO, the partition visitors-0.0.1-58 hasn't been created. 
> (kafka.server.ReplicaManager){code}
> -> Here, these partitions are created. 
> First of all - am I supposed to be missing data here? I am assuming I don't, 
> and so far I don't see traces of losing anything.
> If so, I'm not sure what these messages are trying to say here. Should they 
> really be at WARN level? If so - should the message clarify better the 
> different risks involved? 
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12503) Resizing the thread cache in a non thread safe way can cause records to be redirected throughout the topology

2021-03-18 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-12503.

Resolution: Fixed

> Resizing the thread cache in a non thread safe way can cause records to be 
> redirected throughout the topology
> -
>
> Key: KAFKA-12503
> URL: https://issues.apache.org/jira/browse/KAFKA-12503
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Walker Carlson
>Assignee: Walker Carlson
>Priority: Blocker
> Fix For: 2.8.0
>
>
> When a thread is added, removed or replaced the cache is resized. When the 
> thread cache was resized it was being done so from the thread initiating 
> these calls. This can cause the record to be redirected to the wrong 
> processor via the call to `evict` in the cache. The evict flushes records 
> downstream to the next processor after the cache. But if this is on the wrong 
> thread the wrong processor receives them. 
> This can cause 3 problems.
> 1) When the owner finishes processing the record it set the current node to 
> null in the processor context a this then causes the other processor to throw 
> an exception `StreamsException: Current node is unknown.`. 
> 2) Depending on the type it can cause a class cast exception as the record is 
> a different type. Mostly this happened when the value types were different 
> inside of the map node from the toStream method
> 3) A silent issue is it could cause data to be processed by the wrong node 
> and cause data corruption. We have not been able to confirm this last one but 
> it is the most dangerous in many ways.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8608) Broker shows WARN on reassignment partitions on new brokers: Replica LEO, follower position & Cache truncation

2021-03-18 Thread Zhang Jianguo (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhang Jianguo updated KAFKA-8608:
-
Attachment: image-2021-03-19-10-36-04-328.png

> Broker shows WARN on reassignment partitions on new brokers: Replica LEO, 
> follower position & Cache truncation
> --
>
> Key: KAFKA-8608
> URL: https://issues.apache.org/jira/browse/KAFKA-8608
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.1.1
> Environment: Kafka 2.1.1
>Reporter: Di Campo
>Priority: Minor
>  Labels: broker, reassign, repartition
> Attachments: image-2021-03-19-10-36-04-328.png
>
>
> I added two brokers (brokerId 4,5) to a 3-node (brokerId 1,2,3) cluster where 
> there were 32 topics and 64 partitions on each, replication 3.
> Running reassigning partitions. 
> On each run, I can see the following WARN messages, but when the reassignment 
> partition process finishes, it all seems OK. ISR is OK (count is 3 in every 
> partition).
> But I get the following messages types, one per partition:
>  
> {code:java}
> [2019-06-27 12:42:03,946] WARN [LeaderEpochCache visitors-0.0.1-10] New epoch 
> entry EpochEntry(epoch=24, startOffset=51540) caused truncation of 
> conflicting entries ListBuffer(EpochEntry(epoch=22, startOffset=51540)). 
> Cache now contains 5 entries. (kafka.server.epoch.LeaderEpochFileCache) {code}
> -> This relates to cache, so I suppose it's pretty safe.
> {code:java}
> [2019-06-27 12:42:04,250] WARN [ReplicaManager broker=1] Leader 1 failed to 
> record follower 3's position 47981 since the replica is not recognized to be 
> one of the assigned replicas 1,2,5 for partition visitors-0.0.1-28. Empty 
> records will be returned for this partition. 
> (kafka.server.ReplicaManager){code}
> -> This is scary. I'm not sure about the severity of this, but it looks like 
> it may be missing records? 
> {code:java}
> [2019-06-27 12:42:03,709] WARN [ReplicaManager broker=1] While recording the 
> replica LEO, the partition visitors-0.0.1-58 hasn't been created. 
> (kafka.server.ReplicaManager){code}
> -> Here, these partitions are created. 
> First of all - am I supposed to be missing data here? I am assuming I don't, 
> and so far I don't see traces of losing anything.
> If so, I'm not sure what these messages are trying to say here. Should they 
> really be at WARN level? If so - should the message clarify better the 
> different risks involved? 
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #10356: KAFKA-12503: cache size set by the thread

2021-03-18 Thread GitBox


ableegoldman commented on pull request #10356:
URL: https://github.com/apache/kafka/pull/10356#issuecomment-802493975


   Merged and cherrypicked to 2.8 cc @vvcephei 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #10356: KAFKA-12503: cache size set by the thread

2021-03-18 Thread GitBox


ableegoldman merged pull request #10356:
URL: https://github.com/apache/kafka/pull/10356


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10356: KAFKA-12503: cache size set by the thread

2021-03-18 Thread GitBox


ableegoldman commented on pull request #10356:
URL: https://github.com/apache/kafka/pull/10356#issuecomment-802491798


   I don't think we should aim to catch things in soak, although I'm not on the 
side of "we should never catch anything in soak/everything must have an 
integration test" either. This particular issue does seem appropriate for an 
integration test -- as I said concurrency bugs are hard to actually reproduce, 
but we should absolutely be flexing the cache resize codepath in an integration 
test. I'm happy to leave that as followup work as long as we do get around to 
it.
   
   This looks good and the build passed so I'll merge this to unblock the 
release


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12507) java.lang.OutOfMemoryError: Direct buffer memory

2021-03-18 Thread diehu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304600#comment-17304600
 ] 

diehu commented on KAFKA-12507:
---

We did not set the -XX:MaxDirectMemorySize, and its default value is equal to 
-Xmx=1G.  In server.properties, we set num.network.threads=16, 
num.io.threads=6, and the socket.request.max.bytes is not set. 

> java.lang.OutOfMemoryError: Direct buffer memory
> 
>
> Key: KAFKA-12507
> URL: https://issues.apache.org/jira/browse/KAFKA-12507
> Project: Kafka
>  Issue Type: Bug
>  Components: core
> Environment: kafka version: 2.0.1
> java version: 
> java version "1.8.0_211"
> Java(TM) SE Runtime Environment (build 1.8.0_211-b12)
> Java HotSpot(TM) 64-Bit Server VM (build 25.211-b12, mixed mode)
> the command we use to start kafka broker:
> java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 
> -XX:InitiatingHeapOccupancyPercent=35 -Djava.awt.headless=true 
> -XX:+ExplicitGCInvokesConcurrent 
>Reporter: diehu
>Priority: Major
>
> Hi, we have three brokers in our kafka cluster, and we use scripts to send 
> data to kafka at a rate of about 3.6w eps. After about one month, we got the 
> OOM error: 
> {code:java}
> [2021-01-09 17:12:24,750] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.OutOfMemoryError: Direct buffer memory
> at java.nio.Bits.reserveMemory(Bits.java:694)
> at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
> at sun.nio.ch.IOUtil.read(IOUtil.java:195)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> at 
> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:104)
> at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
> at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:335)
> at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:296)
> at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:562)
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:498)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:427)
> at kafka.network.Processor.poll(SocketServer.scala:679)
> at kafka.network.Processor.run(SocketServer.scala:584)
> at java.lang.Thread.run(Thread.java:748){code}
>  the kafka server is not shutdown, but always get this error. And at the same 
> time, data can not be produced to kafka cluster, consumer can not consume 
> data from kafka cluster.
> We used the recommended java parameter XX:+ExplicitGCInvokesConcurrent  but 
> it seems not useful. 
>  Only kafka cluster restart helps to fix this problem.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12507) java.lang.OutOfMemoryError: Direct buffer memory

2021-03-18 Thread Wenbing Shen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304593#comment-17304593
 ] 

Wenbing Shen commented on KAFKA-12507:
--

How much do you set about these parameters:    
-XX:MaxDirectMemorySize、num.network.threads、socket.request.max.bytes

We have encountered the same problem in the environment of our customers. We 
set  -XX:MaxDirectMemorySize=2G 、num.network.threads=120,Later, I circumvented 
this problem by calculating real-time traffic and reducing the 
num.network.threads.

> java.lang.OutOfMemoryError: Direct buffer memory
> 
>
> Key: KAFKA-12507
> URL: https://issues.apache.org/jira/browse/KAFKA-12507
> Project: Kafka
>  Issue Type: Bug
>  Components: core
> Environment: kafka version: 2.0.1
> java version: 
> java version "1.8.0_211"
> Java(TM) SE Runtime Environment (build 1.8.0_211-b12)
> Java HotSpot(TM) 64-Bit Server VM (build 25.211-b12, mixed mode)
> the command we use to start kafka broker:
> java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 
> -XX:InitiatingHeapOccupancyPercent=35 -Djava.awt.headless=true 
> -XX:+ExplicitGCInvokesConcurrent 
>Reporter: diehu
>Priority: Major
>
> Hi, we have three brokers in our kafka cluster, and we use scripts to send 
> data to kafka at a rate of about 3.6w eps. After about one month, we got the 
> OOM error: 
> {code:java}
> [2021-01-09 17:12:24,750] ERROR Processor got uncaught exception. 
> (kafka.network.Processor)
> java.lang.OutOfMemoryError: Direct buffer memory
> at java.nio.Bits.reserveMemory(Bits.java:694)
> at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
> at sun.nio.ch.IOUtil.read(IOUtil.java:195)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
> at 
> org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:104)
> at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
> at 
> org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:335)
> at 
> org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:296)
> at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:562)
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:498)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:427)
> at kafka.network.Processor.poll(SocketServer.scala:679)
> at kafka.network.Processor.run(SocketServer.scala:584)
> at java.lang.Thread.run(Thread.java:748){code}
>  the kafka server is not shutdown, but always get this error. And at the same 
> time, data can not be produced to kafka cluster, consumer can not consume 
> data from kafka cluster.
> We used the recommended java parameter XX:+ExplicitGCInvokesConcurrent  but 
> it seems not useful. 
>  Only kafka cluster restart helps to fix this problem.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12478) Consumer group may lose data for newly expanded partitions when add partitions for topic if the group is set to consume from the latest

2021-03-18 Thread hudeqi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304592#comment-17304592
 ] 

hudeqi commented on KAFKA-12478:


Thank you very much for your reply! I belong to Kuaishou Message-oriented 
middleware group, which is mainly responsible for the secondary development and 
personalized customization of kafka.

  Kafka is widely used throughout the company, and the case proposed by the 
issue was recently discovered by a business partner who is very sensitive to 
data. This shocked us, the company has a large number of topic, and 
add-partition is a relatively high-frequency operation, but a considerable part 
of business uses latest parameters. If the consumer client perceives the 
expansion lagging behind the producer client, data will be definitely lost. As 
a storage middleware, losing data must be a serious problem. *Although this 
problem can be avoided by config earliest, but it is not elegant, and the 
company uses clients in many other languages, such as rdkafka,go,python, etc. 
We expect to be transparent to the client without losing data, and if the 
amount of topic data is large. "earliest" may also put some pressure on the 
kafka servers, so we want to optimize the server logic to nearly completely 
solve this case.*

  Looking forward to your reply!

> Consumer group may lose data for newly expanded partitions when add 
> partitions for topic if the group is set to consume from the latest
> ---
>
> Key: KAFKA-12478
> URL: https://issues.apache.org/jira/browse/KAFKA-12478
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.7.0
>Reporter: hudeqi
>Priority: Blocker
>  Labels: patch
>   Original Estimate: 1,158h
>  Remaining Estimate: 1,158h
>
>   This problem is exposed in our product environment: a topic is used to 
> produce monitoring data. *After expanding partitions, the consumer side of 
> the business reported that the data is lost.*
>   After preliminary investigation, the lost data is all concentrated in the 
> newly expanded partitions. The reason is: when the server expands, the 
> producer firstly perceives the expansion, and some data is written in the 
> newly expanded partitions. But the consumer group perceives the expansion 
> later, after the rebalance is completed, the newly expanded partitions will 
> be consumed from the latest if it is set to consume from the latest. Within a 
> period of time, the data of the newly expanded partitions is skipped and lost 
> by the consumer.
>   If it is not necessarily set to consume from the earliest for a huge data 
> flow topic when starts up, this will make the group consume historical data 
> from the broker crazily, which will affect the performance of brokers to a 
> certain extent. Therefore, *it is necessary to consume these partitions from 
> the earliest separately.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (KAFKA-12478) Consumer group may lose data for newly expanded partitions when add partitions for topic if the group is set to consume from the latest

2021-03-18 Thread hudeqi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

hudeqi updated KAFKA-12478:
---
Comment: was deleted

(was:   Thank you very much for your reply! I belong to Kuaishou 
Message-oriented middleware group, which is mainly responsible for the 
secondary development and personalized customization of kafka.

  Kafka is widely used throughout the company, and the case proposed by the 
issue was recently discovered by a business partner who is very sensitive to 
data. This shocked us, the company has a large number of topic, and 
add-partition is a relatively high-frequency operation, but a considerable part 
of business uses latest parameters. If the consumer client perceives the 
expansion lagging behind the producer client, data will be definitely lost. As 
a storage middleware, losing data must be a serious problem. *Although this 
problem can be avoided by config earliest, but it is not elegant, and the 
company uses clients in many other languages, such as rdkafka,go,python, etc. 
We expect to be transparent to the client without losing data, and if the 
amount of topic data is large. "earliest" may also put some pressure on the 
kafka servers, so we want to optimize the server logic to nearly completely 
solve this case.*

  Looking forward to your reply!)

> Consumer group may lose data for newly expanded partitions when add 
> partitions for topic if the group is set to consume from the latest
> ---
>
> Key: KAFKA-12478
> URL: https://issues.apache.org/jira/browse/KAFKA-12478
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.7.0
>Reporter: hudeqi
>Priority: Blocker
>  Labels: patch
>   Original Estimate: 1,158h
>  Remaining Estimate: 1,158h
>
>   This problem is exposed in our product environment: a topic is used to 
> produce monitoring data. *After expanding partitions, the consumer side of 
> the business reported that the data is lost.*
>   After preliminary investigation, the lost data is all concentrated in the 
> newly expanded partitions. The reason is: when the server expands, the 
> producer firstly perceives the expansion, and some data is written in the 
> newly expanded partitions. But the consumer group perceives the expansion 
> later, after the rebalance is completed, the newly expanded partitions will 
> be consumed from the latest if it is set to consume from the latest. Within a 
> period of time, the data of the newly expanded partitions is skipped and lost 
> by the consumer.
>   If it is not necessarily set to consume from the earliest for a huge data 
> flow topic when starts up, this will make the group consume historical data 
> from the broker crazily, which will affect the performance of brokers to a 
> certain extent. Therefore, *it is necessary to consume these partitions from 
> the earliest separately.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12478) Consumer group may lose data for newly expanded partitions when add partitions for topic if the group is set to consume from the latest

2021-03-18 Thread hudeqi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304285#comment-17304285
 ] 

hudeqi edited comment on KAFKA-12478 at 3/19/21, 2:16 AM:
--

  Thank you very much for your reply! I belong to Kuaishou Message-oriented 
middleware group, which is mainly responsible for the secondary development and 
personalized customization of kafka.

  Kafka is widely used throughout the company, and the case proposed by the 
issue was recently discovered by a business partner who is very sensitive to 
data. This shocked us, the company has a large number of topic, and 
add-partition is a relatively high-frequency operation, but a considerable part 
of business uses latest parameters. If the consumer client perceives the 
expansion lagging behind the producer client, data will be definitely lost. As 
a storage middleware, losing data must be a serious problem. *Although this 
problem can be avoided by config earliest, but it is not elegant, and the 
company uses clients in many other languages, such as rdkafka,go,python, etc. 
We expect to be transparent to the client without losing data, and if the 
amount of topic data is large. "earliest" may also put some pressure on the 
kafka servers, so we want to optimize the server logic to nearly completely 
solve this case.*

  Looking forward to your reply!


was (Author: hudeqi):
  Thank you very much for your reply! I belong to Kuaishou Message-oriented 
middleware group, which is mainly responsible for the secondary development and 
personalized customization of kafka.

  Kafka is widely used throughout the company, and the case proposed by the 
issue was recently discovered by a business partner who is very sensitive to 
data. This shocked us, the company has a large number of topic, and 
add-partition is a relatively high-frequency operation, but a considerable part 
of business uses latest parameters. If the consumer client perceives the 
expansion lagging behind the producer client, data will be definitely lost. As 
a storage middleware, losing data must be a serious problem. *Although this 
problem can be avoided by config earliest, but it is not elegant, and the 
company uses clients in many other languages, such as rdkafka,go,python, etc. 
We expect to be transparent to the client without losing data, and if the 
amount of topic data is large. "earliest" may also put some pressure on the 
kafka servers, so we want to optimize the server logic to nearly completely 
solve this case.*

  Looking forward to your reply!

> Consumer group may lose data for newly expanded partitions when add 
> partitions for topic if the group is set to consume from the latest
> ---
>
> Key: KAFKA-12478
> URL: https://issues.apache.org/jira/browse/KAFKA-12478
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.7.0
>Reporter: hudeqi
>Priority: Blocker
>  Labels: patch
>   Original Estimate: 1,158h
>  Remaining Estimate: 1,158h
>
>   This problem is exposed in our product environment: a topic is used to 
> produce monitoring data. *After expanding partitions, the consumer side of 
> the business reported that the data is lost.*
>   After preliminary investigation, the lost data is all concentrated in the 
> newly expanded partitions. The reason is: when the server expands, the 
> producer firstly perceives the expansion, and some data is written in the 
> newly expanded partitions. But the consumer group perceives the expansion 
> later, after the rebalance is completed, the newly expanded partitions will 
> be consumed from the latest if it is set to consume from the latest. Within a 
> period of time, the data of the newly expanded partitions is skipped and lost 
> by the consumer.
>   If it is not necessarily set to consume from the earliest for a huge data 
> flow topic when starts up, this will make the group consume historical data 
> from the broker crazily, which will affect the performance of brokers to a 
> certain extent. Therefore, *it is necessary to consume these partitions from 
> the earliest separately.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10697) Remove ProduceResponse.responses

2021-03-18 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai resolved KAFKA-10697.

Fix Version/s: 3.0.0
   Resolution: Fixed

> Remove ProduceResponse.responses
> 
>
> Key: KAFKA-10697
> URL: https://issues.apache.org/jira/browse/KAFKA-10697
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chun-Hao Tang
>Priority: Minor
> Fix For: 3.0.0
>
>
> This is a follow-up of KAFKA-9628.
> related discussion: 
> https://github.com/apache/kafka/pull/9401#discussion_r518984349



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 merged pull request #10332: KAFKA-10697: Remove ProduceResponse.responses

2021-03-18 Thread GitBox


chia7712 merged pull request #10332:
URL: https://github.com/apache/kafka/pull/10332


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12507) java.lang.OutOfMemoryError: Direct buffer memory

2021-03-18 Thread diehu (Jira)
diehu created KAFKA-12507:
-

 Summary: java.lang.OutOfMemoryError: Direct buffer memory
 Key: KAFKA-12507
 URL: https://issues.apache.org/jira/browse/KAFKA-12507
 Project: Kafka
  Issue Type: Bug
  Components: core
 Environment: kafka version: 2.0.1
java version: 
java version "1.8.0_211"
Java(TM) SE Runtime Environment (build 1.8.0_211-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.211-b12, mixed mode)

the command we use to start kafka broker:
java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 
-XX:InitiatingHeapOccupancyPercent=35 -Djava.awt.headless=true 
-XX:+ExplicitGCInvokesConcurrent 
Reporter: diehu


Hi, we have three brokers in our kafka cluster, and we use scripts to send data 
to kafka at a rate of about 3.6w eps. After about one month, we got the OOM 
error: 
{code:java}
[2021-01-09 17:12:24,750] ERROR Processor got uncaught exception. 
(kafka.network.Processor)
java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:694)
at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
at sun.nio.ch.IOUtil.read(IOUtil.java:195)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at 
org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:104)
at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
at 
org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:335)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:296)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:562)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:498)
at org.apache.kafka.common.network.Selector.poll(Selector.java:427)
at kafka.network.Processor.poll(SocketServer.scala:679)
at kafka.network.Processor.run(SocketServer.scala:584)
at java.lang.Thread.run(Thread.java:748){code}
 the kafka server is not shutdown, but always get this error. And at the same 
time, data can not be produced to kafka cluster, consumer can not consume data 
from kafka cluster.

We used the recommended java parameter XX:+ExplicitGCInvokesConcurrent  but it 
seems not useful. 

 Only kafka cluster restart helps to fix this problem.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12385) Remove FetchResponse#responseData

2021-03-18 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304553#comment-17304553
 ] 

Chia-Ping Tsai commented on KAFKA-12385:


This issue is blocked as we are introducing topic id :(

> Remove FetchResponse#responseData
> -
>
> Key: KAFKA-12385
> URL: https://issues.apache.org/jira/browse/KAFKA-12385
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> reference to [https://github.com/apache/kafka/pull/9758#discussion_r584142074]
> We can rewrite related code to avoid using stale data structure.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 commented on pull request #10356: KAFKA-12503: cache size set by the thread

2021-03-18 Thread GitBox


wcarlson5 commented on pull request #10356:
URL: https://github.com/apache/kafka/pull/10356#issuecomment-802427967


   I will don't know about testing this via an integration test. I worry about 
introducing just another flaky test, these types of issue get caught mostly in 
soak. Even with the optimal conditions for it to go wrong it took a couple days.
   
   I will think about how to improve the tests but that will happen in a 
different PR for sure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12506) Expand AdjustStreamThreadCountTest

2021-03-18 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304551#comment-17304551
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12506:


cc [~wcarlson5] -- the first two items are probably a prerequisite for the test 
I described 
[here|https://github.com/apache/kafka/pull/10356#pullrequestreview-615974924], 
I'll try and remember to cross them off this list when we have something like 
that

> Expand AdjustStreamThreadCountTest
> --
>
> Key: KAFKA-12506
> URL: https://issues.apache.org/jira/browse/KAFKA-12506
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: newbie, newbie++
>
> Right now the AdjustStreamThreadCountTest runs a minimal topology that just 
> consumes a single input topic, and doesn't produce any data to this topic. 
> Some of the complex concurrency bugs that we've found only showed up when we 
> had some actual data to process and a stateful topology: 
> [KAFKA-12503|https://issues.apache.org/jira/browse/KAFKA-12503]
> We should try to expand the Streams application in this test to really flex 
> the feature. Imo this would include 
> # Produce some data to the input topic 
> # One or more stateful operators in the topology, with caching enabled
> # A repartition to get multiple subtopologies
> # Write out some results to an output topic
> # Verify that these results show up in the output



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12506) Expand AdjustStreamThreadCountTest

2021-03-18 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12506:
---
Description: 
Right now the AdjustStreamThreadCountTest runs a minimal topology that just 
consumes a single input topic, and doesn't produce any data to this topic. Some 
of the complex concurrency bugs that we've found only showed up when we had 
some actual data to process and a stateful topology: 
[KAFKA-12503|https://issues.apache.org/jira/browse/KAFKA-12503]

We should try to expand the Streams application in this test to really flex the 
feature. Imo this would include 

# Produce some data to the input topic 
# One or more stateful operators in the topology, with caching enabled
# A repartition to get multiple subtopologies
# Write out some results to an output topic
# Verify that these results show up in the output

  was:
Right now the AdjustStreamThreadCountTest runs a minimal topology that just 
consumes a single input topic, and doesn't produce any data to this topic. Some 
of the complex concurrency bugs that we've found only showed up when we had 
some actual data to process and a stateful topology: 
[KAFKA-12503|https://issues.apache.org/jira/browse/KAFKA-12503]

We should try to expand the Streams application in this test to really flex the 
feature. Imo this would include 

# One or more stateful operators in the topology, with caching enabled
# A repartition to get multiple subtopologies
# Writing to an output topic
# Produce some data to the input topic and verify that some results show up in 
the output


> Expand AdjustStreamThreadCountTest
> --
>
> Key: KAFKA-12506
> URL: https://issues.apache.org/jira/browse/KAFKA-12506
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: newbie, newbie++
>
> Right now the AdjustStreamThreadCountTest runs a minimal topology that just 
> consumes a single input topic, and doesn't produce any data to this topic. 
> Some of the complex concurrency bugs that we've found only showed up when we 
> had some actual data to process and a stateful topology: 
> [KAFKA-12503|https://issues.apache.org/jira/browse/KAFKA-12503]
> We should try to expand the Streams application in this test to really flex 
> the feature. Imo this would include 
> # Produce some data to the input topic 
> # One or more stateful operators in the topology, with caching enabled
> # A repartition to get multiple subtopologies
> # Write out some results to an output topic
> # Verify that these results show up in the output



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12506) Expand AdjustStreamThreadCountTest

2021-03-18 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12506:
--

 Summary: Expand AdjustStreamThreadCountTest
 Key: KAFKA-12506
 URL: https://issues.apache.org/jira/browse/KAFKA-12506
 Project: Kafka
  Issue Type: Improvement
  Components: streams, unit tests
Reporter: A. Sophie Blee-Goldman


Right now the AdjustStreamThreadCountTest runs a minimal topology that just 
consumes a single input topic, and doesn't produce any data to this topic. Some 
of the complex concurrency bugs that we've found only showed up when we had 
some actual data to process and a stateful topology: 
[KAFKA-12503|https://issues.apache.org/jira/browse/KAFKA-12503]

We should try to expand the Streams application in this test to really flex the 
feature. Imo this would include 

# One or more stateful operators in the topology, with caching enabled
# A repartition to get multiple subtopologies
# Writing to an output topic
# Produce some data to the input topic and verify that some results show up in 
the output



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12506) Expand AdjustStreamThreadCountTest

2021-03-18 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12506:
---
Labels: newbie newbie++  (was: )

> Expand AdjustStreamThreadCountTest
> --
>
> Key: KAFKA-12506
> URL: https://issues.apache.org/jira/browse/KAFKA-12506
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: newbie, newbie++
>
> Right now the AdjustStreamThreadCountTest runs a minimal topology that just 
> consumes a single input topic, and doesn't produce any data to this topic. 
> Some of the complex concurrency bugs that we've found only showed up when we 
> had some actual data to process and a stateful topology: 
> [KAFKA-12503|https://issues.apache.org/jira/browse/KAFKA-12503]
> We should try to expand the Streams application in this test to really flex 
> the feature. Imo this would include 
> # One or more stateful operators in the topology, with caching enabled
> # A repartition to get multiple subtopologies
> # Writing to an output topic
> # Produce some data to the input topic and verify that some results show up 
> in the output



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on pull request #10357: MINOR: KIP-500 Cluster ID is a String

2021-03-18 Thread GitBox


hachikuji commented on pull request #10357:
URL: https://github.com/apache/kafka/pull/10357#issuecomment-802417977


   @vvcephei Where are we with the RC? Do we still have time to merge this to 
2.8? We can do this after the release if necessary, but we'd prefer not to 
break the protocol if we don't have to.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12505) Should kafka-storage.sh accept a non-UUID for its --cluster-id parameter?

2021-03-18 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12505:

Labels: kip-500  (was: )

> Should kafka-storage.sh accept a non-UUID for its --cluster-id parameter?
> -
>
> Key: KAFKA-12505
> URL: https://issues.apache.org/jira/browse/KAFKA-12505
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Ron Dagostino
>Priority: Minor
>  Labels: kip-500
>
> Should StorageTool support accepting non-UUIDs via its --cluster-id argument? 
>  One purpose of the tool is to minimize the chance that a broker could use 
> data from the wrong volume (i.e. data from another cluster). Generating a 
> random UUID via the --random-uuid parameter encourages using a globally 
> unique value for every cluster and is consistent with the behavior today with 
> ZooKeeper, whereas allowing a non-UUID here would increase the chance that 
> someone could reuse a Cluster ID value across clusters and short-circuit the 
> risk mitigation that this tool provides.
> Discuss...



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #10356: KAFKA-12503: cache size set by the thread

2021-03-18 Thread GitBox


ableegoldman commented on pull request #10356:
URL: https://github.com/apache/kafka/pull/10356#issuecomment-802411941


   One unrelated test failure: 
`kafka.api.SaslGssapiSslEndToEndAuthorizationTest.testNoConsumeWithDescribeAclViaAssign`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #10357: MINOR: KIP-500 Cluster ID is a String

2021-03-18 Thread GitBox


rondagostino commented on a change in pull request #10357:
URL: https://github.com/apache/kafka/pull/10357#discussion_r597326794



##
File path: core/src/main/scala/kafka/tools/StorageTool.scala
##
@@ -198,7 +198,7 @@ object StorageTool extends Logging {
 s"does not appear to be a valid UUID: ${e.getMessage}")
 }
 require(config.nodeId >= 0, s"The node.id must be set to a non-negative 
integer.")
-new MetaProperties(effectiveClusterId, config.nodeId)
+new MetaProperties(effectiveClusterId.toString, config.nodeId)

Review comment:
   > file a jira so we don't forget about it. 
   
   https://issues.apache.org/jira/browse/KAFKA-12505




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12505) Should kafka-storage.sh accept a non-UUID for its --cluster-id parameter?

2021-03-18 Thread Ron Dagostino (Jira)
Ron Dagostino created KAFKA-12505:
-

 Summary: Should kafka-storage.sh accept a non-UUID for its 
--cluster-id parameter?
 Key: KAFKA-12505
 URL: https://issues.apache.org/jira/browse/KAFKA-12505
 Project: Kafka
  Issue Type: New Feature
Reporter: Ron Dagostino


Should StorageTool support accepting non-UUIDs via its --cluster-id argument?  
One purpose of the tool is to minimize the chance that a broker could use data 
from the wrong volume (i.e. data from another cluster). Generating a random 
UUID via the --random-uuid parameter encourages using a globally unique value 
for every cluster and is consistent with the behavior today with ZooKeeper, 
whereas allowing a non-UUID here would increase the chance that someone could 
reuse a Cluster ID value across clusters and short-circuit the risk mitigation 
that this tool provides.

Discuss...



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12499) Adjust transaction timeout according to commit interval on Streams EOS

2021-03-18 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304537#comment-17304537
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12499:


Nice catch, but just wondering why choose 10x the commit interval? And should 
we set a lower bound as well so we fall back on the default of 1 minute if 10 * 
commit_interval is below that? For example with EOS that would mean a 
transaction timeout of just one second -- we only check whether it's time to 
commit after a loop of processing all tasks, if there are a large number of 
tasks or some heavy processing we'd just be constantly timing out.

> Adjust transaction timeout according to commit interval on Streams EOS
> --
>
> Key: KAFKA-12499
> URL: https://issues.apache.org/jira/browse/KAFKA-12499
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 3.0.0
>
>
> The transaction timeout is set to 1 minute by default on Producer today, 
> while the commit interval on the other hand could be set to a very large 
> value, which makes the stream always hit transaction timeout and drop into 
> rebalance. We should increase the transaction timeout correspondingly when 
> commit interval is large.
> On the other hand, broker could have a limit on the max transaction timeout 
> to be set. If we scale up client transaction timeout over the limit, stream 
> will fail due to  INVALID_TRANSACTION_TIMEOUT. To alleviate this problem, 
> user could define their own customized transaction timeout to avoid hitting 
> the limit, so we should still respect what user configures in the override.
> The new rule for configuring transaction timeout should look like:
> 1. If transaction timeout is set in streams config, use it
> 2. if not, transaction_timeout = max(default_transaction_timeout, 10 * 
> commit_interval) 
> Additionally if INVALID_TRANSACTION_TIMEOUT was thrown on Streams when 
> calling initTransaction(), we should wrap the exception to inform user that 
> their setting for commit interval could potentially be too high and should 
> adjust.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #10357: MINOR: KIP-500 Cluster ID is a String

2021-03-18 Thread GitBox


hachikuji commented on a change in pull request #10357:
URL: https://github.com/apache/kafka/pull/10357#discussion_r597325275



##
File path: core/src/main/scala/kafka/tools/StorageTool.scala
##
@@ -198,7 +198,7 @@ object StorageTool extends Logging {
 s"does not appear to be a valid UUID: ${e.getMessage}")
 }
 require(config.nodeId >= 0, s"The node.id must be set to a non-negative 
integer.")
-new MetaProperties(effectiveClusterId, config.nodeId)
+new MetaProperties(effectiveClusterId.toString, config.nodeId)

Review comment:
   I'm ok with doing this later. Let's be sure to file a jira so we don't 
forget about it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #10357: MINOR: KIP-500 Cluster ID is a String

2021-03-18 Thread GitBox


rondagostino commented on a change in pull request #10357:
URL: https://github.com/apache/kafka/pull/10357#discussion_r597324486



##
File path: core/src/main/scala/kafka/tools/StorageTool.scala
##
@@ -198,7 +198,7 @@ object StorageTool extends Logging {
 s"does not appear to be a valid UUID: ${e.getMessage}")
 }
 require(config.nodeId >= 0, s"The node.id must be set to a non-negative 
integer.")
-new MetaProperties(effectiveClusterId, config.nodeId)
+new MetaProperties(effectiveClusterId.toString, config.nodeId)

Review comment:
   > Should we also remove the [StorageTool] logic above which requires 
that the clusterId parses as a UUID 
   
   Basically, this is asking if `StorageTool` should support accepting 
non-UUIDs via its `--cluster-id` argument.  One purpose of the tool is to 
minimize the chance that a broker could use data from the wrong volume (i.e. 
data from another cluster).  Generating a random UUID via the `--random-uuid` 
parameter encourages using a globally unique value for every cluster and is 
consistent with the behavior today with ZooKeeper, whereas allowing a non-UUID 
here would increase the chance that someone could reuse a Cluster ID value 
across clusters and short-circuit the risk mitigation that this tool provides.
   
   Merging this PR now avoids an uncomfortable "rename" in 
`BrokerRegistrationRequest` (or a breaking change) after 2.8 is released; 
letting `StorageTool` accept non-UUIDs can be done at any time with no impact.  
Maybe we keep this PR about removing the constraint, which also simply cannot 
exist -- otherwise we can't upgrade clusters that don't use a UUID -- and leave 
this question about the `--cluster-id` parameter for a broader discussion 
later?  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on pull request #10356: KAFKA-12503: cache size set by the thread

2021-03-18 Thread GitBox


wcarlson5 commented on pull request #10356:
URL: https://github.com/apache/kafka/pull/10356#issuecomment-802393840


   @guozhangwang @ableegoldman Can you take a look?
   
   @vvcephei I think this needs to be picked to 2.8 as well


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12504) KafkaMetadataLog should check clean shutdown file

2021-03-18 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-12504:

Description: When a `Log` is opened, we will skip recovery if the clean 
shutdown file is present. This is specified as an optional parameter, which has 
a default value of `true`. Currently `KafkaMetadataLog` relies on this default 
instead of properly checking for clean shutdown, which means we will always 
skip recovery for this log.  (was: When a `Log` is opened, we will skip 
recovery if the clean shutdown file is present. This is specified as an 
optional parameter, which has a default value of `true`. Currently 
`KafkaMetadataLog` relies on this default instead of properly checking for 
clean shutdown, which means we will always skip recovery for this og.)

> KafkaMetadataLog should check clean shutdown file
> -
>
> Key: KAFKA-12504
> URL: https://issues.apache.org/jira/browse/KAFKA-12504
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>  Labels: kip-500
>
> When a `Log` is opened, we will skip recovery if the clean shutdown file is 
> present. This is specified as an optional parameter, which has a default 
> value of `true`. Currently `KafkaMetadataLog` relies on this default instead 
> of properly checking for clean shutdown, which means we will always skip 
> recovery for this log.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rohitrmd commented on a change in pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch

2021-03-18 Thread GitBox


rohitrmd commented on a change in pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#discussion_r597309028



##
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##
@@ -485,6 +485,170 @@ final class KafkaMetadataLogTest {
 batchBuilder.build()
   }
 
+  @Test
+  def testValidateEpochGreaterThanLastKnownEpoch(): Unit = {
+val log = buildMetadataLog(tempDir, mockTime)
+
+val numberOfRecords = 1
+val epoch = 1
+
+append(log, numberOfRecords, epoch)
+
+val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, 
epoch + 1)
+assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind)
+assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = {
+val log = buildMetadataLog(tempDir, mockTime)
+
+val numberOfRecords = 10
+val epoch = 1
+
+append(log, numberOfRecords, epoch)
+log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+  snapshot.freeze()
+}
+assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, 
epoch - 1)
+assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind)
+assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())

Review comment:
   @hachikuji made changes to reuse snapshotId variable. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10563) Make sure task directories don't remain locked by dead threads

2021-03-18 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-10563:
---
Fix Version/s: (was: 2.8.0)

> Make sure task directories don't remain locked by dead threads
> --
>
> Key: KAFKA-10563
> URL: https://issues.apache.org/jira/browse/KAFKA-10563
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>
> Most common/expected exceptions within Streams are handled gracefully, and 
> the thread will make sure to clean up all resources such as task locks during 
> shutdown. However, there are some instances where an unexpected exception 
> such as an IllegalStateException can leave some resources orphaned.
> We have seen this happen to task directories after an IllegalStateException 
> is hit during the TaskManager's rebalance handling logic – the Thread shuts 
> down, but loses track of some tasks before unlocking them. This blocks any 
> further work on that task by any other thread in the same instance.
> Previously we decided that this was "ok" because an IllegalStateException 
> means all bets are off. But with the upcoming work of KIP-663 and KIP-671, 
> users will be able to react smartly on dying threads and replace them with 
> new ones, making it more important than ever to ensure that the application 
> can continue on with no lasting repercussions of a thread death. If we allow 
> users to revive/replace a thread that dies due to IllegalStateException, that 
> thread should not be blocked from doing any work by the ghost of its 
> predecessor. 
> It might be easiest to just add some logic to the cleanup thread to verify 
> all the existing locks against the list of live threads, and remove any 
> zombie locks. But we probably want to do this purging more frequently than 
> the cleanup thread runs (10min by default) – so maybe we can leverage the 
> work in KIP-671 and have each thread purge any locks still owned by it after 
> the uncaught exception handler runs, but before the thread dies.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12503) Resizing the thread cache in a non thread safe way can cause records to be redirected throughout the topology

2021-03-18 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-12503:
--

Assignee: Walker Carlson

> Resizing the thread cache in a non thread safe way can cause records to be 
> redirected throughout the topology
> -
>
> Key: KAFKA-12503
> URL: https://issues.apache.org/jira/browse/KAFKA-12503
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Walker Carlson
>Assignee: Walker Carlson
>Priority: Blocker
> Fix For: 2.8.0
>
>
> When a thread is added, removed or replaced the cache is resized. When the 
> thread cache was resized it was being done so from the thread initiating 
> these calls. This can cause the record to be redirected to the wrong 
> processor via the call to `evict` in the cache. The evict flushes records 
> downstream to the next processor after the cache. But if this is on the wrong 
> thread the wrong processor receives them. 
> This can cause 3 problems.
> 1) When the owner finishes processing the record it set the current node to 
> null in the processor context a this then causes the other processor to throw 
> an exception `StreamsException: Current node is unknown.`. 
> 2) Depending on the type it can cause a class cast exception as the record is 
> a different type. Mostly this happened when the value types were different 
> inside of the map node from the toStream method
> 3) A silent issue is it could cause data to be processed by the wrong node 
> and cause data corruption. We have not been able to confirm this last one but 
> it is the most dangerous in many ways.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-18 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-12463:
--
Description: 
Kafka consumers have a pluggable [partition assignment 
interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
 that comes with several out-of-the-box implementations including the 
[RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
 
[RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
 
[StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
 and 
[CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].

If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
is used by default. Although there are some benefits to this assignor including 
stability of assignment across generations and simplicity of design, it comes 
with a major drawback: the number of active consumers in a group is limited to 
the number of partitions in the topic(s) with the most partitions. For an 
example of the worst case, in a consumer group where every member is subscribed 
to ten topics that each have one partition, only one member of that group will 
be assigned any topic partitions.

This can end up producing counterintuitive and even frustrating behavior when a 
sink connector is brought up with N tasks to read from some collection of 
topics with a total of N topic partitions, but some tasks end up idling and not 
processing any data.
h3. Proposed Change

*NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below will 
not work as consumers will still perform eager rebalancing as long as at least 
one of the partition assignors they are configured with does not support 
cooperative rebalancing. KAFKA-12487 should also be addressed before 
configuring any Connect worker to use the {{CooperativeStickyAssignor}} for any 
sink connectors.*

[KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
 introduced the {{CooperativeStickyAssignor}}, which seeks to provide a stable 
assignment across generations wherever possible, provide the most even 
assignment possible (taking into account possible differences in subscriptions 
across consumers in the group), and allow consumers to continue processing data 
during rebalance. The documentation for the assignor states that "Users should 
prefer this assignor for newer clusters." As Connect and the tooling around it 
matures and automatic restarts of failed tasks become more popular, care should 
be taken to ensure that the consumer group churn created by restarting one or 
more tasks doesn't compromise the availability of other tasks by forcing them 
to temporarily yield up all of their partitions just to reclaim them after a 
rebalance has completed.

With that in mind, we should alter the default consumer configuration for sink 
tasks to use the new {{CooperativeStickyAssignor}}. In order to do this in a 
backwards-compatible fashion that also enables rolling upgrades, this should be 
implemented by changing the {{Worker}} to set the following on the consumer 
configuration created for each sink connector task:
{code:java}
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
{code}
This way, consumer groups for sink connectors on Connect clusters in the 
process of being upgraded will continue to use the {{RangeAssignor}} until all 
workers in the cluster have been upgraded, and then will switch over to the new 
{{CooperativeStickyAssignor}} automatically.

Importantly, this setting will only be a default, and any user-specified 
overrides either in the *worker config*:

 
{code:java}
consumer.partition.assignment.strategy={code}
 

or in the *connector config*:

 
{code:java}
"consumer.override.partition.assignment.strategy": ""{code}
 

will still be respected.

This improvement is viable as far back as -2.3- 2.4, when the 
{{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
fix, should only be applied to the Connect framework in an upcoming minor 
release.
h3. Workaround: manually setting the partition assignment strategy

There is a simple workaround to achieve the same behavior in releases 2.4 and 
later that don't include this improvement: manually override either a connector 
configuration or an entire worker configuration.

In order to avoid task failures while the connector is being reconfigured, it 
is highly recommended that the consumer be configured with a list of both the 
new and the current partition assignment strategies, instead of just the new 

[jira] [Created] (KAFKA-12504) KafkaMetadataLog should check clean shutdown file

2021-03-18 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-12504:
---

 Summary: KafkaMetadataLog should check clean shutdown file
 Key: KAFKA-12504
 URL: https://issues.apache.org/jira/browse/KAFKA-12504
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


When a `Log` is opened, we will skip recovery if the clean shutdown file is 
present. This is specified as an optional parameter, which has a default value 
of `true`. Currently `KafkaMetadataLog` relies on this default instead of 
properly checking for clean shutdown, which means we will always skip recovery 
for this og.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-18 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-12463:
--
Description: 
Kafka consumers have a pluggable [partition assignment 
interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
 that comes with several out-of-the-box implementations including the 
[RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
 
[RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
 
[StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
 and 
[CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].

If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
is used by default. Although there are some benefits to this assignor including 
stability of assignment across generations and simplicity of design, it comes 
with a major drawback: the number of active consumers in a group is limited to 
the number of partitions in the topic(s) with the most partitions. For an 
example of the worst case, in a consumer group where every member is subscribed 
to ten topics that each have one partition, only one member of that group will 
be assigned any topic partitions.

This can end up producing counterintuitive and even frustrating behavior when a 
sink connector is brought up with N tasks to read from some collection of 
topics with a total of N topic partitions, but some tasks end up idling and not 
processing any data.
h3. Proposed Change

*NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below will 
not work as consumers will still perform eager rebalancing as long as at least 
one of the partition assignors they are configured with does not support 
cooperative rebalancing. KAFKA-12487 should also be addressed before 
configuring any Connect worker to use the {{CooperativeStickyAssignor}} for any 
sink connectors.*

[KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
 introduced the {{CooperativeStickyAssignor}}, which seeks to provide a stable 
assignment across generations wherever possible, provide the most even 
assignment possible (taking into account possible differences in subscriptions 
across consumers in the group), and allow consumers to continue processing data 
during rebalance. The documentation for the assignor states that "Users should 
prefer this assignor for newer clusters."

We should alter the default consumer configuration for sink tasks to use the 
new {{CooperativeStickyAssignor}}. In order to do this in a 
backwards-compatible fashion that also enables rolling upgrades, this should be 
implemented by changing the {{Worker}} to set the following on the consumer 
configuration created for each sink connector task:
{code:java}
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
{code}
This way, consumer groups for sink connectors on Connect clusters in the 
process of being upgraded will continue to use the {{RangeAssignor}} until all 
workers in the cluster have been upgraded, and then will switch over to the new 
{{CooperativeStickyAssignor}} automatically. But, this setting will be 
overwritten by any user-specified {{consumer.partition.assignment.strategy}} 
property in the worker configuration, and by any user-specified 
{{consumer.override.partition.assignment.strategy}} property in a sink 
connector configuration when per-connector client overrides is enabled in the 
worker config with {{connector.client.config.override.policy=ALL}}.

This improvement is viable as far back as -2.3- 2.4, when the 
{{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
fix, should only be applied to the Connect framework in an upcoming minor 
release.
h3. Manually setting the partition assignment strategy

There is a simple workaround to achieve the same behavior in releases 2.4 and 
later that don't include this improvement: manually override either a connector 
configuration or an entire worker configuration.

In order to avoid task failures while the connector is being reconfigured, it 
is highly recommended that the consumer be configured with a list of both the 
new and the current partition assignment strategies, instead of just the new 
partition assignment strategy.

 

For example, to update a connector formerly configured to use the 
{{RangeAssignor}} strategy to instead use the {{RoundRobinAssignor}} strategy, 
add the following to the connector configuration:
{code:java}
"consumer.override.partition.assignment.strategy": 

[jira] [Commented] (KAFKA-12503) Resizing the thread cache in a non thread safe way can cause records to be redirected throughout the topology

2021-03-18 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304523#comment-17304523
 ] 

Guozhang Wang commented on KAFKA-12503:
---

Thanks for the summary [~wcarlson5] [~ableegoldman]. I'm wondering if we should 
fix it along with https://issues.apache.org/jira/browse/KAFKA-12500 together by 
adding a separate function in the cache to just free the space corresponding to 
a thread that does not trigger eviction --- i.e. just clear the records in the 
buffer. Wondering what's your current proposal to fix.

> Resizing the thread cache in a non thread safe way can cause records to be 
> redirected throughout the topology
> -
>
> Key: KAFKA-12503
> URL: https://issues.apache.org/jira/browse/KAFKA-12503
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Walker Carlson
>Priority: Blocker
> Fix For: 2.8.0
>
>
> When a thread is added, removed or replaced the cache is resized. When the 
> thread cache was resized it was being done so from the thread initiating 
> these calls. This can cause the record to be redirected to the wrong 
> processor via the call to `evict` in the cache. The evict flushes records 
> downstream to the next processor after the cache. But if this is on the wrong 
> thread the wrong processor receives them. 
> This can cause 3 problems.
> 1) When the owner finishes processing the record it set the current node to 
> null in the processor context a this then causes the other processor to throw 
> an exception `StreamsException: Current node is unknown.`. 
> 2) Depending on the type it can cause a class cast exception as the record is 
> a different type. Mostly this happened when the value types were different 
> inside of the map node from the toStream method
> 3) A silent issue is it could cause data to be processed by the wrong node 
> and cause data corruption. We have not been able to confirm this last one but 
> it is the most dangerous in many ways.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-18 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304519#comment-17304519
 ] 

Chris Egerton commented on KAFKA-12463:
---

Thanks Randall--
{quote}I agree with you that we should fix the behavior. But the fix will 
appear only in certain releases, and not all users will be able to upgrade to 
those releases to get the fix. So, documenting the simple workaround will help 
users in those situations. I suggested documenting the workaround here to help 
any users that do stumble upon this issue when searching for a potential fix, 
regardless of whether the workaround is also documented elsewhere.
{quote}
That's an interesting take... I guess the difference I perceive here is that 
this is a proposed improvement, not really a fix. I don't know if any users are 
going to stumble onto this after something breaks since it doesn't address 
anything breaking to begin with. Either way, I guess we can try to iron out the 
workaround a little more here (will do now) but hopefully we can put something 
in the public-facing docs for Connect (maybe as part of upgrade notes if we 
change the default consumer partition assignment strategy) in addition to that; 
seems like that might get more of the target audience here.

 

With regards to steps forward and your question, [~ableegoldman], I wasn't 
certain one way or the other about round robin vs cooperative sticky 
assignment. I had a few thoughts:
 * When the set of task configurations changes, the advantages of stickiness or 
the cooperative protocol are basically irrelevant since each task and its 
accompanying consumer is brought down and a new one is brought up in its place.
 * In a completely stable world where consumer assignment never changes, round 
robin would be ideal (out of the ones available right now) as it'd guarantee 
as-even-as-possible spread of partitions within the same topic across tasks.
 * Otherwise, the only times the cooperative protocol might come into play are:
 ** When a consumer subscription is updated (which would cause a consumer 
rebalance but keep all sink tasks running)
 ** When a task is started or shut down without the connector being 
reconfigured (which may happen when a single task fails, a failed task is 
restarted, a new worker joins the group, or an existing worker leaves the group)
 ** When the consumer for a task falls out of the group (likely because the 
task is taking too long to process data provided to it by the framework).
 * Under most of these scenarios, stickiness would provide no benefit as any 
time a consumer is created, a new task is brought up in its place. The only 
exception is an update to a consumer subscription, but even that would require 
some changes to how Connect invokes 
[SinkTask::open|https://kafka.apache.org/27/javadoc/org/apache/kafka/connect/sink/SinkTask.html#open-java.util.Collection-]
 and 
[SinkTask::close|https://kafka.apache.org/27/javadoc/org/apache/kafka/connect/sink/SinkTask.html#close-java.util.Collection-]
 to basically fake cooperative rebalancing in the way that's proposed in the 
[StickyAssignor 
docs|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html]
 under the "Impact on {{ConsumerRebalanceListener}}" section.

The benefits of the sticky assignor seem pretty slim, and would require some 
one-off tooling from Connect to basically re-implement the cooperative 
protocol. This is why I'm personally not in favor of it, but would love to 
learn more if there's something I'm missing here.

So with all that in mind, we can transform the question into whether it's 
important to favor any of the scenarios outlined above where cooperative 
rebalancing might be of some benefit, and if not, opt to use the round robin 
assignor. There's one that comes to mind that I think might be worth 
considering, and count in favor of using the cooperative sticky assignor: if 
there's any kind of tooling in place that restarts failed tasks automatically, 
there will be significant consumer churn as consumers may rapidly fall out of 
and join the group.

I think this scenario is going to become more and more common as adoption of 
Connect increases and both it and the tooling around it mature, and as a 
result, I'm gently in favor of trying to use the {{CooperativeStickyAssignor}} 
now, or at least, when it becomes possible in the Connect framework once work 
on KAFKA-12477 and KAFKA-12487 completes.

I raised this point on the PR but it bears repeating here: we might want to 
reason through this carefully since we only get one shot to do this kind of 
automated upgrade before the next one gets more complicated (the list of 
assignors will grow from one to two this time around; it'll either have to grow 
from two to three the next, or we'll have to risk breaking changes for users 
who skip an upgrade step). I raised the round robin assignor as an option 

[jira] [Commented] (KAFKA-12503) Resizing the thread cache in a non thread safe way can cause records to be redirected throughout the topology

2021-03-18 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304518#comment-17304518
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12503:


Thanks for the writeup! If anyone reading this wants to check out the code for 
themselves, the methods and classes of note are (1) the ProcessorContext, which 
tracks a "current node" and forwards records to that note as they flow through 
the topology, (2) StreamTask#process where a StreamThread picks up the next 
record, sets the current node on the context to the SourceNode, processes the 
record through the subtopology, and then sets the current node back to null, 
and (3) TimestampedCacheFlushListener which is tied to a cache and the 
processor node immediately downstream of that cache. The listener receives 
records that have been evicted, saves the current node on the context as the 
`prevNode`, then sets the current node on the context, forwards the record, and 
resets the current node to `prevNode`.

The `StreamsException: Current node is unknown` must have occurred because the 
StreamThread owning this task had finished processing the record and set the 
current node to null when another thread hit the injected exception and tried 
to resize the cache. When the evicted record ended up in 
TimestampedCacheFlushListener#apply, the listener saved "null" as it's previous 
node, processed the record with its own node, and then reset it to null. If the 
other StreamThread had begun processing this task in the meantime, it would 
suddenly find its current node to be null and throw the exception we see.

The ClassCastException is similar, but instead of the listener setting the 
current node to null while the StreamTask was in the middle of processing, it 
must have set the current node to some other node elsewhere in the topology. If 
this other node has different input/output types, we would run into a 
ClassCastException as it forwards eg a Long to the downstream node which was 
expecting a Change.

Of course these are the race conditions under which we ran into visible 
symptoms, but as Walker mentioned the more dangerous possibility is all the 
other times when a foreign processor is inserted into the middle of the 
subtopology but the data types match and therefore no exception is thrown. In 
these cases we would be silently corrupting the data.

Obviously one followup question we should ask is why it was so easy for one 
thread to process records that belong to another in the first place? We should 
consider some kind of safety mechanism to ensure single-threaded access to the 
ProcessorContext and task directories -- turns out the locking mechanism 
doesn't actually protect against multithreaded access unless we explicitly ask 
it whether we can lock it or not.

> Resizing the thread cache in a non thread safe way can cause records to be 
> redirected throughout the topology
> -
>
> Key: KAFKA-12503
> URL: https://issues.apache.org/jira/browse/KAFKA-12503
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Walker Carlson
>Priority: Blocker
> Fix For: 2.8.0
>
>
> When a thread is added, removed or replaced the cache is resized. When the 
> thread cache was resized it was being done so from the thread initiating 
> these calls. This can cause the record to be redirected to the wrong 
> processor via the call to `evict` in the cache. The evict flushes records 
> downstream to the next processor after the cache. But if this is on the wrong 
> thread the wrong processor receives them. 
> This can cause 3 problems.
> 1) When the owner finishes processing the record it set the current node to 
> null in the processor context a this then causes the other processor to throw 
> an exception `StreamsException: Current node is unknown.`. 
> 2) Depending on the type it can cause a class cast exception as the record is 
> a different type. Mostly this happened when the value types were different 
> inside of the map node from the toStream method
> 3) A silent issue is it could cause data to be processed by the wrong node 
> and cause data corruption. We have not been able to confirm this last one but 
> it is the most dangerous in many ways.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12503) Resizing the thread cache in a non thread safe way can cause records to be redirected throughout the topology

2021-03-18 Thread Walker Carlson (Jira)
Walker Carlson created KAFKA-12503:
--

 Summary: Resizing the thread cache in a non thread safe way can 
cause records to be redirected throughout the topology
 Key: KAFKA-12503
 URL: https://issues.apache.org/jira/browse/KAFKA-12503
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.8.0
Reporter: Walker Carlson
 Fix For: 2.8.0


When a thread is added, removed or replaced the cache is resized. When the 
thread cache was resized it was being done so from the thread initiating these 
calls. This can cause the record to be redirected to the wrong processor via 
the call to `evict` in the cache. The evict flushes records downstream to the 
next processor after the cache. But if this is on the wrong thread the wrong 
processor receives them. 

This can cause 3 problems.

1) When the owner finishes processing the record it set the current node to 
null in the processor context a this then causes the other processor to throw 
an exception `StreamsException: Current node is unknown.`. 

2) Depending on the type it can cause a class cast exception as the record is a 
different type. Mostly this happened when the value types were different inside 
of the map node from the toStream method

3) A silent issue is it could cause data to be processed by the wrong node and 
cause data corruption. We have not been able to confirm this last one but it is 
the most dangerous in many ways.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #10357: MINOR: KIP-500 Cluster ID is a String

2021-03-18 Thread GitBox


hachikuji commented on a change in pull request #10357:
URL: https://github.com/apache/kafka/pull/10357#discussion_r597285632



##
File path: core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala
##
@@ -106,14 +105,8 @@ object MetaProperties {
 value.getOrElse(throw new RuntimeException(s"Failed to find required 
property $key."))
   }
 
-  def requireClusterId(properties: RawMetaProperties): Uuid = {
-val value = require(ClusterIdKey, properties.clusterId)
-try {
-  Uuid.fromString(value)
-} catch {
-  case e: Throwable => throw new RuntimeException(s"Failed to parse 
$ClusterIdKey property " +
-s"as a UUID: ${e.getMessage}")
-}
+  def requireClusterId(properties: RawMetaProperties): String = {

Review comment:
   We may as well get rid of this method. We are already using 
`require(NodeIdKey, properties.nodeId)` above.

##
File path: core/src/test/scala/kafka/server/BrokerMetadataCheckpointTest.scala
##
@@ -81,21 +80,21 @@ class BrokerMetadataCheckpointTest {
   }
 
   @Test
-  def testMetaPropertiesDoesNotAllowHexEncodedUUIDs(): Unit = {
+  def testMetaPropertiesAllowsHexEncodedUUIDs(): Unit = {
 val properties = new RawMetaProperties()
 properties.version = 1
 properties.clusterId = "7bc79ca1-9746-42a3-a35a-efb3cde44492"
 properties.nodeId = 1
-assertThrows(classOf[RuntimeException], () => 
MetaProperties.parse(properties))
+MetaProperties.parse(properties)

Review comment:
   Can we assert something about the return value? For example, we could 
follow the pattern in `testCreateMetadataProperties`.

##
File path: core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
##
@@ -32,7 +32,7 @@ class KafkaRaftServerTest {
 
   @Test
   def testSuccessfulLoadMetaProperties(): Unit = {
-val clusterId = Uuid.randomUuid()
+val clusterId = Uuid.randomUuid().toString

Review comment:
   Probably doesn't matter too much, but perhaps we could hard-code all of 
these to some string such as "JgxuGe9URy-E-ceaL04lEw" from the other test. The 
way it's written after this change looks a little odd.

##
File path: core/src/main/scala/kafka/tools/StorageTool.scala
##
@@ -198,7 +198,7 @@ object StorageTool extends Logging {
 s"does not appear to be a valid UUID: ${e.getMessage}")
 }
 require(config.nodeId >= 0, s"The node.id must be set to a non-negative 
integer.")
-new MetaProperties(effectiveClusterId, config.nodeId)
+new MetaProperties(effectiveClusterId.toString, config.nodeId)

Review comment:
   Should we also remove the logic above which requires that the clusterId 
parses as a UUID?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10342: KAFKA-12288: remove task-level filesystem locks

2021-03-18 Thread GitBox


ableegoldman commented on pull request #10342:
URL: https://github.com/apache/kafka/pull/10342#issuecomment-802350667


   @wcarlson5 that's a good question, and I totally agree we should be more 
careful about that. In theory we should always clean up all state including 
unlock task directories during a thread's shutdown, but it's possible something 
slips through and a task directory gets orphaned. I filed a ticket to address 
this a while back: see  
[KAFKA-10563](https://issues.apache.org/jira/browse/KAFKA-10563)
   
   That said, I think it's somewhat of an orthogonal concern and would prefer 
to do that in a separate PR to keep this one light. I don't think the changes 
here make that any better or worse -- if a dying thread orphaned some tasks, we 
weren't cleaning up the filesystem locks either.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks

2021-03-18 Thread GitBox


ableegoldman commented on a change in pull request #10342:
URL: https://github.com/apache/kafka/pull/10342#discussion_r597281724



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -477,15 +441,7 @@ private void cleanRemovedTasksCalledByCleanerThread(final 
long cleanupDelayMs) {
 exception
 );
 } finally {
-try {
-unlock(id);
-} catch (final IOException exception) {
-log.warn(
-String.format("%s Swallowed the following 
exception during unlocking after deletion of obsolete " +
-"state directory %s for task %s:", 
logPrefix(), dirName, id),
-exception
-);
-}
+unlock(id);

Review comment:
   I think we need to keep that one because `Utils.delete` can throw 
IOException too




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #10357: MINOR: KIP-500 Cluster ID is a String

2021-03-18 Thread GitBox


rondagostino commented on pull request #10357:
URL: https://github.com/apache/kafka/pull/10357#issuecomment-802344461


   This needs to be cherry-picked to 2.8.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino opened a new pull request #10357: MINOR: KIP-500 Cluster ID is a String

2021-03-18 Thread GitBox


rondagostino opened a new pull request #10357:
URL: https://github.com/apache/kafka/pull/10357


   It is possible that an existing Cluster ID in Zookeeper is not convertible 
to a UUID.  KIP-500 cannot constrain Cluster IDs to be convertible to a UUID 
and still expect such a cluster to be able to upgrade to KIP-500.  This patch 
removes this UUID constraint and makes KIP-500 consistent with existing RPCs, 
which treat Cluster ID as a String.
   
   It is unlikely that an existing Cluster ID in Zookeeper would not be 
convertible to a UUID because the Cluster ID is auto-generated in the form of a 
UUID and stored in ZooKeeper as soon as the first broker connects.  However, 
there is nothing preventing someone from generating a Cluster ID that does not 
confirm to the UUID format and manually storing/bootstrapping that into 
ZooKeeper before the first broker connects.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10289: KAFKA-12440: ClusterId validation for Vote, BeginQuorum, EndQuorum and FetchSnapshot

2021-03-18 Thread GitBox


hachikuji commented on a change in pull request #10289:
URL: https://github.com/apache/kafka/pull/10289#discussion_r597278840



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -814,6 +822,10 @@ private EndQuorumEpochResponseData 
handleEndQuorumEpochRequest(
 ) throws IOException {
 EndQuorumEpochRequestData request = (EndQuorumEpochRequestData) 
requestMetadata.data;
 
+if (!hasValidClusterId(request)) {

Review comment:
   I think a better way to do this is to modify `validateVoterOnlyRequest` 
and `validateLeaderOnlyRequest` so that we pass the clusterId. Then we can get 
rid of `getClusterId`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks

2021-03-18 Thread GitBox


ableegoldman commented on a change in pull request #10342:
URL: https://github.com/apache/kafka/pull/10342#discussion_r597278900



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -288,50 +277,31 @@ private String logPrefix() {
  * Get the lock for the {@link TaskId}s directory if it is available
  * @param taskId task id
  * @return true if successful
- * @throws IOException if the file cannot be created or file handle cannot 
be grabbed, should be considered as fatal
  */
-synchronized boolean lock(final TaskId taskId) throws IOException {
+synchronized boolean lock(final TaskId taskId) {
 if (!hasPersistentStores) {
 return true;
 }
 
-final File lockFile;
-// we already have the lock so bail out here
-final LockAndOwner lockAndOwner = locks.get(taskId);
-if (lockAndOwner != null && 
lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
-log.trace("{} Found cached state dir lock for task {}", 
logPrefix(), taskId);
-return true;
-} else if (lockAndOwner != null) {
-// another thread owns the lock
-return false;
-}
-
-try {
-lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME);
-} catch (final ProcessorStateException e) {
-// directoryForTask could be throwing an exception if another 
thread
-// has concurrently deleted the directory
-return false;
-}
-
-final FileChannel channel;
-
-try {
-channel = getOrCreateFileChannel(taskId, lockFile.toPath());
-} catch (final NoSuchFileException e) {
-// FileChannel.open(..) could throw NoSuchFileException when there 
is another thread
-// concurrently deleting the parent directory (i.e. the directory 
of the taskId) of the lock
-// file, in this case we will return immediately indicating 
locking failed.
+final String lockOwner = lockedTasksToStreamThreadOwner.get(taskId);
+if (lockOwner != null) {
+if (lockOwner.equals(Thread.currentThread().getName())) {
+log.trace("{} Found cached state dir lock for task {}", 
logPrefix(), taskId);
+// we already own the lock
+return true;
+} else {
+// another thread owns the lock
+return false;
+}
+} else if (!stateDir.exists()) {

Review comment:
   Good point -- I put this in there because there was a test for this 
behavior, but actually that makes no sense. Will do




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10289: KAFKA-12440: ClusterId validation for Vote, BeginQuorum, EndQuorum and FetchSnapshot

2021-03-18 Thread GitBox


hachikuji commented on a change in pull request #10289:
URL: https://github.com/apache/kafka/pull/10289#discussion_r597274570



##
File path: checkstyle/suppressions.xml
##
@@ -68,7 +68,7 @@
   
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>
 
 
+  
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|KafkaRaftClient).java"/>

Review comment:
   I agree it is unfortunate. There are probably ways we can improve this. 
For example, this logic smells a little bit:
   ```java
   if (quorum.isLeader()) {
   logger.debug("Rejecting vote request {} with epoch {} since we 
are already leader on that epoch",
   request, candidateEpoch);
   voteGranted = false;
   } else if (quorum.isCandidate()) {
   logger.debug("Rejecting vote request {} with epoch {} since we 
are already candidate on that epoch",
   request, candidateEpoch);
   voteGranted = false;
   } else if (quorum.isResigned()) {
   logger.debug("Rejecting vote request {} with epoch {} since we 
have resigned as candidate/leader in this epoch",
   request, candidateEpoch);
   voteGranted = false;
   } else if (quorum.isFollower()) {
   FollowerState state = quorum.followerStateOrThrow();
   logger.debug("Rejecting vote request {} with epoch {} since we 
already have a leader {} on that epoch",
   request, candidateEpoch, state.leaderId());
   voteGranted = false;
   ```
   It might be possible to push this logic into `EpochState` or at least to use 
make use of the `name()` method in the logging. @dengziming would you be 
interested in following up on this separately?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch

2021-03-18 Thread GitBox


hachikuji commented on a change in pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#discussion_r597260954



##
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##
@@ -485,6 +485,170 @@ final class KafkaMetadataLogTest {
 batchBuilder.build()
   }
 
+  @Test
+  def testValidateEpochGreaterThanLastKnownEpoch(): Unit = {
+val log = buildMetadataLog(tempDir, mockTime)
+
+val numberOfRecords = 1
+val epoch = 1
+
+append(log, numberOfRecords, epoch)
+
+val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, 
epoch + 1)
+assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind)
+assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = {
+val log = buildMetadataLog(tempDir, mockTime)
+
+val numberOfRecords = 10
+val epoch = 1
+
+append(log, numberOfRecords, epoch)
+log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+  snapshot.freeze()
+}
+assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, 
epoch - 1)
+assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind)
+assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())

Review comment:
   What I mean is just using the variable that we already have defined.
   ```scala
   val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
   ...
   assertEquals(snapshotId, resultOffsetAndEpoch.offsetAndEpoch())
   ```

##
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##
@@ -485,6 +485,170 @@ final class KafkaMetadataLogTest {
 batchBuilder.build()
   }
 
+  @Test
+  def testValidateEpochGreaterThanLastKnownEpoch(): Unit = {
+val log = buildMetadataLog(tempDir, mockTime)
+
+val numberOfRecords = 1
+val epoch = 1
+
+append(log, numberOfRecords, epoch)
+
+val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, 
epoch + 1)
+assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind)
+assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = {
+val log = buildMetadataLog(tempDir, mockTime)
+
+val numberOfRecords = 10
+val epoch = 1
+
+append(log, numberOfRecords, epoch)
+log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+  snapshot.freeze()
+}
+assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, 
epoch - 1)
+assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind)
+assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())

Review comment:
   What I mean is just using the value that we already have defined.
   ```scala
   val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
   ...
   assertEquals(snapshotId, resultOffsetAndEpoch.offsetAndEpoch())
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 opened a new pull request #10356: cache size set by the thread

2021-03-18 Thread GitBox


wcarlson5 opened a new pull request #10356:
URL: https://github.com/apache/kafka/pull/10356


   Make is so threads do not directly resize other threads caches
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks

2021-03-18 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304452#comment-17304452
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12463:


Out of curiosity, why use the RoundRobinAssignor rather than the StickyAssignor 
if you want to avoid cooperative? I'm not a Connect expert so maybe there's 
some nuance here, but the StickyAssignor is generally preferred (we were 
planning to make that the default in 3.0 before the CooperativeStickyAssignor 
came along)

> Update default consumer partition assignor for sink tasks
> -
>
> Key: KAFKA-12463
> URL: https://issues.apache.org/jira/browse/KAFKA-12463
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>
> Kafka consumers have a pluggable [partition assignment 
> interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html]
>  that comes with several out-of-the-box implementations including the 
> [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html],
>  
> [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html],
>  
> [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html],
>  and 
> [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html].
> If no partition assignor is configured with a consumer, the {{RangeAssignor}} 
> is used by default. Although there are some benefits to this assignor 
> including stability of assignment across generations and simplicity of 
> design, it comes with a major drawback: the number of active consumers in a 
> group is limited to the number of partitions in the topic(s) with the most 
> partitions. For an example of the worst case, in a consumer group where every 
> member is subscribed to ten topics that each have one partition, only one 
> member of that group will be assigned any topic partitions.
> This can end up producing counterintuitive and even frustrating behavior when 
> a sink connector is brought up with N tasks to read from some collection of 
> topics with a total of N topic partitions, but some tasks end up idling and 
> not processing any data.
> h3. Proposed Change
> *NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below 
> will not work as consumers will still perform eager rebalancing as long as at 
> least one of the partition assignors they are configured with does not 
> support cooperative rebalancing. KAFKA-12487 should also be addressed before 
> configuring any Connect worker to use the {{CooperativeStickyAssignor}} for 
> any sink connectors.*
> [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol]
>  introduced the {{CooperativeStickyAssignor}}, which seeks to provide a 
> stable assignment across generations wherever possible, provide the most even 
> assignment possible (taking into account possible differences in 
> subscriptions across consumers in the group), and allow consumers to continue 
> processing data during rebalance. The documentation for the assignor states 
> that "Users should prefer this assignor for newer clusters."
> We should alter the default consumer configuration for sink tasks to use the 
> new {{CooperativeStickyAssignor}}. In order to do this in a 
> backwards-compatible fashion that also enables rolling upgrades, this should 
> be implemented by changing the {{Worker}} to set the following on the 
> consumer configuration created for each sink connector task:
> {code:java}
> partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor
> {code}
> This way, consumer groups for sink connectors on Connect clusters in the 
> process of being upgraded will continue to use the {{RangeAssignor}} until 
> all workers in the cluster have been upgraded, and then will switch over to 
> the new {{CooperativeStickyAssignor}} automatically. But, this setting will 
> be overwritten by any user-specified 
> {{consumer.partition.assignment.strategy}} property in the worker 
> configuration, and by any user-specified 
> {{consumer.override.partition.assignment.strategy}} property in a sink 
> connector configuration when per-connector client overrides is enabled in the 
> worker config with {{connector.client.config.override.policy=ALL}}.
> This improvement is viable as far back as -2.3- 2.4, when the 
> {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug 
> fix, should only 

[GitHub] [kafka] ableegoldman commented on pull request #10355: KAFKA-12500: fix memory leak in thread cache

2021-03-18 Thread GitBox


ableegoldman commented on pull request #10355:
URL: https://github.com/apache/kafka/pull/10355#issuecomment-802298961


   @vvcephei yeah I'm on it, just wanted to see if there were any concerns 
about the proposed fix first. I'll ping you again when it's ready


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12502) Quorum controller should return topic configs in CreateTopic response

2021-03-18 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-12502:
---

 Summary: Quorum controller should return topic configs in 
CreateTopic response
 Key: KAFKA-12502
 URL: https://issues.apache.org/jira/browse/KAFKA-12502
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson


Configs were added to the response in version 5. 
{code}
  { "name": "Configs", "type": "[]CreatableTopicConfigs", "versions": "5+", 
"nullableVersions": "5+", "ignorable": true,
"about": "Configuration of the topic.", "fields": [
{ "name": "Name", "type": "string", "versions": "5+",
  "about": "The configuration name." },
{ "name": "Value", "type": "string", "versions": "5+", 
"nullableVersions": "5+",
  "about": "The configuration value." },
{ "name": "ReadOnly", "type": "bool", "versions": "5+",
  "about": "True if the configuration is read-only." },
{ "name": "ConfigSource", "type": "int8", "versions": "5+", "default": 
"-1", "ignorable": true,
  "about": "The configuration source." },
{ "name": "IsSensitive", "type": "bool", "versions": "5+",
  "about": "True if this configuration is sensitive." }
  ]}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #10354: MINOR: Exclude KIP-500.md from rat check

2021-03-18 Thread GitBox


ijuma commented on pull request #10354:
URL: https://github.com/apache/kafka/pull/10354#issuecomment-802273720


   Also cherry-picked to 2.8.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #10351: MINOR: use new method to get number of topics in DeleteTopicsRequest

2021-03-18 Thread GitBox


jolshan commented on pull request #10351:
URL: https://github.com/apache/kafka/pull/10351#issuecomment-802271057


   Build was broken due to an unrelated change, but should be fixed now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12501) KafkaClusterTestKit should support mixed-mode testing

2021-03-18 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-12501:
-
Description: 
Follow-on from https://github.com/apache/kafka/pull/10220

We should allow for mixed-mode testing with TestKit. Currently, brokers and 
controllers nodes are run in their respective modes only (where each has only 
one process role).

It would be nice to allow for "mixed-mode" where one node has both the broker 
and controller role. This would help us write tests that verify that all the 
various listeners are working properly on a single node running both roles.

Additionally, it would be nice to support observer controllers along with 
voting controllers

As [~hachikuji] [pointed 
out|https://github.com/apache/kafka/pull/10220#discussion_r588571846], we could 
probably eliminate some of the manual node building in the testkit code by 
using KafkaRaftServer directly. 

  was:
Follow-on from https://github.com/apache/kafka/pull/10220

We should allow for mixed-mode testing with TestKit. Currently, brokers and 
controllers nodes are run in their respective modes only (where each has only 
one process role).

It would be nice to allow for "mixed-mode" where one node has both the broker 
and controller role. This would help us write tests that verify that all the 
various listeners are working properly on a single node running both roles.

As [~hachikuji] [pointed 
out|https://github.com/apache/kafka/pull/10220#discussion_r588571846], we could 
probably eliminate some of the manual node building in the testkit code by 
using KafkaRaftServer directly. 


> KafkaClusterTestKit should support mixed-mode testing
> -
>
> Key: KAFKA-12501
> URL: https://issues.apache.org/jira/browse/KAFKA-12501
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Arthur
>Priority: Minor
>
> Follow-on from https://github.com/apache/kafka/pull/10220
> We should allow for mixed-mode testing with TestKit. Currently, brokers and 
> controllers nodes are run in their respective modes only (where each has only 
> one process role).
> It would be nice to allow for "mixed-mode" where one node has both the broker 
> and controller role. This would help us write tests that verify that all the 
> various listeners are working properly on a single node running both roles.
> Additionally, it would be nice to support observer controllers along with 
> voting controllers
> As [~hachikuji] [pointed 
> out|https://github.com/apache/kafka/pull/10220#discussion_r588571846], we 
> could probably eliminate some of the manual node building in the testkit code 
> by using KafkaRaftServer directly. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #10355: KAFKA-12500: fix memory leak in thread cache

2021-03-18 Thread GitBox


ableegoldman commented on a change in pull request #10355:
URL: https://github.com/apache/kafka/pull/10355#discussion_r597217235



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1097,13 +1102,17 @@ private int getNumStreamThreads(final boolean 
hasGlobalTopology) {
 return Optional.empty();
 }
 
-// Returns the number of threads that are not in the DEAD state -- use 
this over threads.size()
+// Returns the number of threads that are not in the DEAD or 
PENDING_SHUTDOWN state -- use this over threads.size()
 private int getNumLiveStreamThreads() {
 final AtomicInteger numLiveThreads = new AtomicInteger(0);
 synchronized (threads) {
 processStreamThread(thread -> {
 if (thread.state() == StreamThread.State.DEAD) {
+log.debug("Trimming thread {} from the threads list since 
it's state is {}", thread.getName(), StreamThread.State.DEAD);
 threads.remove(thread);
+} else if (thread.state() == 
StreamThread.State.PENDING_SHUTDOWN) {

Review comment:
   You can file an improvement ticket and list some ideas for how to handle 
this (eg scheduler that periodically skims the `threads` list and resizes the 
cache when it finds any DEAD threads to trim) but I'm optimistic this won't be 
an issue for now 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10355: KAFKA-12500: fix memory leak in thread cache

2021-03-18 Thread GitBox


ableegoldman commented on a change in pull request #10355:
URL: https://github.com/apache/kafka/pull/10355#discussion_r597215093



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1097,13 +1102,17 @@ private int getNumStreamThreads(final boolean 
hasGlobalTopology) {
 return Optional.empty();
 }
 
-// Returns the number of threads that are not in the DEAD state -- use 
this over threads.size()
+// Returns the number of threads that are not in the DEAD or 
PENDING_SHUTDOWN state -- use this over threads.size()
 private int getNumLiveStreamThreads() {
 final AtomicInteger numLiveThreads = new AtomicInteger(0);
 synchronized (threads) {
 processStreamThread(thread -> {
 if (thread.state() == StreamThread.State.DEAD) {
+log.debug("Trimming thread {} from the threads list since 
it's state is {}", thread.getName(), StreamThread.State.DEAD);
 threads.remove(thread);
+} else if (thread.state() == 
StreamThread.State.PENDING_SHUTDOWN) {

Review comment:
   Yep -- but imo this should be pretty rare. To hit an OOM you would need 
to have a relatively huge cache, and few total threads (so that the cache size 
per thread is still large), and then you'd need the new thread to start up, 
rebalance to get tasks, finish initializing and restoring them and then start 
consuming enough data to fill up the cache all before the old thread gets 
around to closing it's state stores. 
   Sure, threads can sometimes hang during termination but in my experience 
that's usually after the store closure (and hopefully hanging is itself rare).
   
   My point is, this should be good enough for now, and I'd like to stick with 
the simplest solution this late in the game.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10220: KAFKA-12383: Get RaftClusterTest.java and other KIP-500 junit tests working

2021-03-18 Thread GitBox


mumrah commented on a change in pull request #10220:
URL: https://github.com/apache/kafka/pull/10220#discussion_r597214017



##
File path: core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
##
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.raft.KafkaRaftManager;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.Server;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.server.MetaProperties;
+import kafka.tools.StorageTool;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.controller.Controller;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.raft.metadata.MetaLogRaftShim;
+import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@SuppressWarnings("deprecation") // Needed for Scala 2.12 compatibility
+public class KafkaClusterTestKit implements AutoCloseable {
+private final static Logger log = 
LoggerFactory.getLogger(KafkaClusterTestKit.class);
+
+/**
+ * This class manages a future which is completed with the proper value for
+ * controller.quorum.voters once the randomly assigned ports for all the 
controllers are
+ * known.
+ */
+private static class ControllerQuorumVotersFutureManager implements 
AutoCloseable {
+private final int expectedControllers;
+private final CompletableFuture> future = new 
CompletableFuture<>();
+private final Map controllerPorts = new TreeMap<>();
+
+ControllerQuorumVotersFutureManager(int expectedControllers) {
+this.expectedControllers = expectedControllers;
+}
+
+synchronized void registerPort(int nodeId, int port) {
+controllerPorts.put(nodeId, port);
+if (controllerPorts.size() >= expectedControllers) {
+future.complete(controllerPorts.entrySet().stream().
+map(e -> String.format("%d@localhost:%d", e.getKey(), 
e.getValue())).
+collect(Collectors.toList()));
+}
+}
+
+void fail(Throwable e) {
+future.completeExceptionally(e);
+}
+
+@Override
+public void close() {
+future.cancel(true);
+}
+}
+
+public static class Builder {
+private TestKitNodes nodes;
+private Map configProps = new HashMap<>();
+
+public Builder(TestKitNodes nodes) {
+this.nodes = nodes;
+}
+
+public Builder setConfigProp(String key, String value) {
+this.configProps.put(key, value);
+return this;
+}
+
+public KafkaClusterTestKit build() throws Exception {
+Map controllers = new HashMap<>();
+Map kip500Brokers = new 

[GitHub] [kafka] ijuma merged pull request #10354: MINOR: Exclude KIP-500.md from rat check

2021-03-18 Thread GitBox


ijuma merged pull request #10354:
URL: https://github.com/apache/kafka/pull/10354


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10354: MINOR: Exclude KIP-500.md from rat check

2021-03-18 Thread GitBox


ijuma commented on pull request #10354:
URL: https://github.com/apache/kafka/pull/10354#issuecomment-802259895


   Merged to unbreak the build, we can improve further in future PRs.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12385) Remove FetchResponse#responseData

2021-03-18 Thread Chun-Hao Tang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304418#comment-17304418
 ] 

Chun-Hao Tang commented on KAFKA-12385:
---

[~chia7712] Can I work on this?

> Remove FetchResponse#responseData
> -
>
> Key: KAFKA-12385
> URL: https://issues.apache.org/jira/browse/KAFKA-12385
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Major
>
> reference to [https://github.com/apache/kafka/pull/9758#discussion_r584142074]
> We can rewrite related code to avoid using stale data structure.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 commented on a change in pull request #10355: KAFKA-12500: fix memory leak in thread cache

2021-03-18 Thread GitBox


wcarlson5 commented on a change in pull request #10355:
URL: https://github.com/apache/kafka/pull/10355#discussion_r597208785



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -971,6 +972,7 @@ private int getNumStreamThreads(final boolean 
hasGlobalTopology) {
 synchronized (changeThreadCount) {
 threadIdx = getNextThreadIndex();
 cacheSizePerThread = 
getCacheSizePerThread(getNumLiveStreamThreads() + 1);
+log.info("Adding a new StreamThread with thread id {}; new 
cache size per thread is {}", threadIdx, cacheSizePerThread);

Review comment:
   Maybe log number of threads here too?

##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1097,13 +1102,17 @@ private int getNumStreamThreads(final boolean 
hasGlobalTopology) {
 return Optional.empty();
 }
 
-// Returns the number of threads that are not in the DEAD state -- use 
this over threads.size()
+// Returns the number of threads that are not in the DEAD or 
PENDING_SHUTDOWN state -- use this over threads.size()
 private int getNumLiveStreamThreads() {
 final AtomicInteger numLiveThreads = new AtomicInteger(0);
 synchronized (threads) {
 processStreamThread(thread -> {
 if (thread.state() == StreamThread.State.DEAD) {
+log.debug("Trimming thread {} from the threads list since 
it's state is {}", thread.getName(), StreamThread.State.DEAD);
 threads.remove(thread);
+} else if (thread.state() == 
StreamThread.State.PENDING_SHUTDOWN) {

Review comment:
   are we risking a memory overflow with this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10355: KAFKA-12500: fix memory leak in thread cache

2021-03-18 Thread GitBox


ableegoldman commented on pull request #10355:
URL: https://github.com/apache/kafka/pull/10355#issuecomment-802252651


   @wcarlson5 @cadonna  @lct45 
   
   Should be cherrypicked to 2.8 cc @vvcephei 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #10355: KAFKA-12500: fix memory leak in thread cache

2021-03-18 Thread GitBox


ableegoldman opened a new pull request #10355:
URL: https://github.com/apache/kafka/pull/10355


   Need to exclude threads in `PENDING_SHUTDOWN` from the num live threads 
computation used to compute the new cache size per thread.
   
   Also adds some logging to help follow what's happening when a thread is 
added/removed/replaced
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10220: KAFKA-12383: Get RaftClusterTest.java and other KIP-500 junit tests working

2021-03-18 Thread GitBox


mumrah commented on a change in pull request #10220:
URL: https://github.com/apache/kafka/pull/10220#discussion_r597202938



##
File path: core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
##
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.raft.KafkaRaftManager;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.Server;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.server.MetaProperties;
+import kafka.tools.StorageTool;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.controller.Controller;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.raft.metadata.MetaLogRaftShim;
+import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@SuppressWarnings("deprecation") // Needed for Scala 2.12 compatibility
+public class KafkaClusterTestKit implements AutoCloseable {
+private final static Logger log = 
LoggerFactory.getLogger(KafkaClusterTestKit.class);
+
+/**
+ * This class manages a future which is completed with the proper value for
+ * controller.quorum.voters once the randomly assigned ports for all the 
controllers are
+ * known.
+ */
+private static class ControllerQuorumVotersFutureManager implements 
AutoCloseable {
+private final int expectedControllers;
+private final CompletableFuture> future = new 
CompletableFuture<>();
+private final Map controllerPorts = new TreeMap<>();
+
+ControllerQuorumVotersFutureManager(int expectedControllers) {
+this.expectedControllers = expectedControllers;
+}
+
+synchronized void registerPort(int nodeId, int port) {
+controllerPorts.put(nodeId, port);
+if (controllerPorts.size() >= expectedControllers) {
+future.complete(controllerPorts.entrySet().stream().
+map(e -> String.format("%d@localhost:%d", e.getKey(), 
e.getValue())).
+collect(Collectors.toList()));
+}
+}
+
+void fail(Throwable e) {
+future.completeExceptionally(e);
+}
+
+@Override
+public void close() {
+future.cancel(true);
+}
+}
+
+public static class Builder {
+private TestKitNodes nodes;
+private Map configProps = new HashMap<>();
+
+public Builder(TestKitNodes nodes) {
+this.nodes = nodes;
+}
+
+public Builder setConfigProp(String key, String value) {
+this.configProps.put(key, value);
+return this;
+}
+
+public KafkaClusterTestKit build() throws Exception {
+Map controllers = new HashMap<>();
+Map kip500Brokers = new 

[jira] [Created] (KAFKA-12501) KafkaClusterTestKit should support mixed-mode testing

2021-03-18 Thread David Arthur (Jira)
David Arthur created KAFKA-12501:


 Summary: KafkaClusterTestKit should support mixed-mode testing
 Key: KAFKA-12501
 URL: https://issues.apache.org/jira/browse/KAFKA-12501
 Project: Kafka
  Issue Type: Bug
Reporter: David Arthur


Follow-on from https://github.com/apache/kafka/pull/10220

We should allow for mixed-mode testing with TestKit. Currently, brokers and 
controllers nodes are run in their respective modes only (where each has only 
one process role).

It would be nice to allow for "mixed-mode" where one node has both the broker 
and controller role. This would help us write tests that verify that all the 
various listeners are working properly on a single node running both roles.

As [~hachikuji] [pointed 
out|https://github.com/apache/kafka/pull/10220#discussion_r588571846], we could 
probably eliminate some of the manual node building in the testkit code by 
using KafkaRaftServer directly. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #10275: KAFKA-12434; Admin support for `DescribeProducers` API

2021-03-18 Thread GitBox


hachikuji commented on a change in pull request #10275:
URL: https://github.com/apache/kafka/pull/10275#discussion_r597193539



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiLookupStrategy.java
##
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.AbstractResponse;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.Set;
+
+public interface AdminApiLookupStrategy {
+
+/**
+ * Define the scope of a given key for lookup. Key lookups are complicated
+ * by the need to accommodate different batching mechanics. For example,
+ * a `Metadata` request supports arbitrary batching of topic partitions in
+ * order to discover partitions leaders. This can be supported by returning
+ * a single scope object for all keys.
+ *
+ * On the other hand, `FindCoordinator` request only supports lookup of a
+ * single key. This can be supported by returning a different scope object
+ * for each lookup key.
+ *
+ * @param key the lookup key
+ *
+ * @return request scope indicating how lookup requests can be batched 
together
+ */
+RequestScope lookupScope(T key);
+
+/**
+ * Build the lookup request for a set of keys. The grouping of the keys is 
controlled
+ * through {@link #lookupScope(Object)}. In other words, each set of keys 
that map
+ * to the same request scope object will be sent to this method.
+ *
+ * @param keys the set of keys that require lookup
+ *
+ * @return a builder for the lookup request
+ */
+AbstractRequest.Builder buildRequest(Set keys);
+
+/**
+ * Callback that is invoked when a lookup request returns successfully. 
The handler
+ * should parse the response, check for errors, and return a result 
indicating
+ * which keys were mapped to a brokerId successfully and which keys 
received
+ * a fatal error (e.g. a topic authorization failure).
+ *
+ * Note that keys which receive a retriable error should be left out of the
+ * result. They will be retried automatically. For example, if the 
response of
+ * `FindCoordinator` request indicates an unavailable coordinator, then 
the key
+ * should be left out of the result so that the request will be retried.
+ *
+ * @param keys the set of keys from the associated request
+ * @param response the response received from the broker
+ *
+ * @return a result indicating which keys mapped successfully to a 
brokerId and
+ * which encountered a fatal error
+ */
+LookupResult handleResponse(Set keys, AbstractResponse response);
+
+class LookupResult {
+public final Map mappedKeys;
+public final Map failedKeys;
+
+public LookupResult(
+Map failedKeys,
+Map mappedKeys
+) {
+this.failedKeys = Collections.unmodifiableMap(failedKeys);
+this.mappedKeys = Collections.unmodifiableMap(mappedKeys);
+}
+}
+
+interface RequestScope {
+default OptionalInt destinationBrokerId() {
+return OptionalInt.empty();
+}
+}

Review comment:
   I decided to pull this up to the top level. Let me know if that seems ok.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12500) Cache memory is leaked after removing/replacing a StreamThread

2021-03-18 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reassigned KAFKA-12500:
--

Assignee: A. Sophie Blee-Goldman

> Cache memory is leaked after removing/replacing a StreamThread
> --
>
> Key: KAFKA-12500
> URL: https://issues.apache.org/jira/browse/KAFKA-12500
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 2.8.0
>
>
> We currently leak the memory given to a StreamThread for its cache after it 
> gets shutdown due to being removed or replaced. 
> Removal:
> If the StreamThread is removing itself, or if a thread is being removed by an 
> external caller but fails to shutdown within the allotted time, it won't be 
> removed from the `threads` list to avoid freeing up it's thread id while 
> still running. If the thread hasn't reached the DEAD state when we resize the 
> cache, the new cache size per thread is computed based on the number of live 
> threads, which includes the removed thread at this point.
> Replacement:
> When a thread is replaced, we first shutdown that thread but hold off on 
> removing it from the threads list until it's DEAD. Immediately after the 
> shutdown we call addStreamThread to start up a new thread, which resizes the 
> cache according to num-live-threads + 1.
> In both of these cases, the cache memory of the shutting down thread won't be 
> made available to the remaining threads.
> Note: in general this leak may not be permanent, as a subsequent event once 
> the thread has reached DEAD and been removed from the threads list will cause 
> it's memory to be released. OF course if the subsequent event is another 
> thread removal or replacement, then we just have a new memory leak.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah commented on pull request #10354: MINOR: Exclude KIP-500.md from rat check

2021-03-18 Thread GitBox


mumrah commented on pull request #10354:
URL: https://github.com/apache/kafka/pull/10354#issuecomment-802229547


   Maybe we should exclude all markdown files from the license check?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12500) Cache memory is leaked after removing/replacing a StreamThread

2021-03-18 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12500:
--

 Summary: Cache memory is leaked after removing/replacing a 
StreamThread
 Key: KAFKA-12500
 URL: https://issues.apache.org/jira/browse/KAFKA-12500
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: A. Sophie Blee-Goldman
 Fix For: 2.8.0


We currently leak the memory given to a StreamThread for its cache after it 
gets shutdown due to being removed or replaced. 

Removal:
If the StreamThread is removing itself, or if a thread is being removed by an 
external caller but fails to shutdown within the allotted time, it won't be 
removed from the `threads` list to avoid freeing up it's thread id while still 
running. If the thread hasn't reached the DEAD state when we resize the cache, 
the new cache size per thread is computed based on the number of live threads, 
which includes the removed thread at this point.

Replacement:
When a thread is replaced, we first shutdown that thread but hold off on 
removing it from the threads list until it's DEAD. Immediately after the 
shutdown we call addStreamThread to start up a new thread, which resizes the 
cache according to num-live-threads + 1.

In both of these cases, the cache memory of the shutting down thread won't be 
made available to the remaining threads.

Note: in general this leak may not be permanent, as a subsequent event once the 
thread has reached DEAD and been removed from the threads list will cause it's 
memory to be released. OF course if the subsequent event is another thread 
removal or replacement, then we just have a new memory leak.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on pull request #10354: MINOR: Exclude KIP-500.md from rat check

2021-03-18 Thread GitBox


cmccabe commented on pull request #10354:
URL: https://github.com/apache/kafka/pull/10354#issuecomment-802214287


   Thanks, @jolshan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #10354: MINOR: Exclude KIP-500.md from rat check

2021-03-18 Thread GitBox


vvcephei commented on pull request #10354:
URL: https://github.com/apache/kafka/pull/10354#issuecomment-802212427


   Thanks, @jolshan ! Will this need to be cherry-picked to 2.8?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rohitrmd commented on a change in pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch

2021-03-18 Thread GitBox


rohitrmd commented on a change in pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#discussion_r597158051



##
File path: raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java
##
@@ -16,40 +16,56 @@
  */
 package org.apache.kafka.raft;
 
+import java.util.Objects;
+
 public final class ValidOffsetAndEpoch {
-final private Type type;
+final private Kind kind;

Review comment:
   Agree with existing design and limitations of interface compared to case 
classes. Java 15 has sealed classes which will be good alternative: 
https://openjdk.java.net/jeps/360 to interfaces as we will be limiting the 
extension. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #10354: MINOR: Exclude KIP-500.md from rat check

2021-03-18 Thread GitBox


hachikuji commented on pull request #10354:
URL: https://github.com/apache/kafka/pull/10354#issuecomment-802206799


   Thanks for fixing this so quickly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #10349: MINOR: Move `configurations.all` to be a child of `allprojects`

2021-03-18 Thread GitBox


ijuma merged pull request #10349:
URL: https://github.com/apache/kafka/pull/10349


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan opened a new pull request #10354: MINOR: Exclude KIP-500.md from rat check

2021-03-18 Thread GitBox


jolshan opened a new pull request #10354:
URL: https://github.com/apache/kafka/pull/10354


   Builds are failing since this file does not have a license. Similar .md 
files do not seem to have licenses, so I've added this file to the exclude list.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rohitrmd commented on a change in pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch

2021-03-18 Thread GitBox


rohitrmd commented on a change in pull request #10276:
URL: https://github.com/apache/kafka/pull/10276#discussion_r597144327



##
File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
##
@@ -485,6 +485,170 @@ final class KafkaMetadataLogTest {
 batchBuilder.build()
   }
 
+  @Test
+  def testValidateEpochGreaterThanLastKnownEpoch(): Unit = {
+val log = buildMetadataLog(tempDir, mockTime)
+
+val numberOfRecords = 1
+val epoch = 1
+
+append(log, numberOfRecords, epoch)
+
+val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, 
epoch + 1)
+assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind)
+assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())
+  }
+
+  @Test
+  def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = {
+val log = buildMetadataLog(tempDir, mockTime)
+
+val numberOfRecords = 10
+val epoch = 1
+
+append(log, numberOfRecords, epoch)
+log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords))
+
+val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch)
+TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot =>
+  snapshot.freeze()
+}
+assertTrue(log.deleteBeforeSnapshot(snapshotId))
+
+val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, 
epoch - 1)
+assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind)
+assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())

Review comment:
   @hachikuji can you explain what is meant by using snapshotId? You mean 
changing `assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), 
resultOffsetAndEpoch.offsetAndEpoch())` to`assertEquals(new 
OffsetAndEpoch(snapshotId.offset, snapshotId.epoch), 
resultOffsetAndEpoch.offsetAndEpoch())
   ` and not use variables?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah opened a new pull request #10353: DO NOT MERGE: Add a failing unit test to check Gradle and Github behavior

2021-03-18 Thread GitBox


mumrah opened a new pull request #10353:
URL: https://github.com/apache/kafka/pull/10353


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-3813) Let SourceConnector implementations access the offset reader

2021-03-18 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-3813.
--
Resolution: Duplicate

> Let SourceConnector implementations access the offset reader
> 
>
> Key: KAFKA-3813
> URL: https://issues.apache.org/jira/browse/KAFKA-3813
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: Randall Hauch
>Priority: Major
>  Labels: needs-kip
>
> When a source connector is started, having access to the 
> {{OffsetStorageReader}} would allow it to more intelligently configure its 
> tasks based upon the stored offsets. (Currently only the {{SourceTask}} 
> implementations can access the offset reader (via the {{SourceTaskContext}}), 
> but the {{SourceConnector}} does not have access to the offset reader.)
> Of course, accessing the stored offsets is not likely to be useful for sink 
> connectors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Reopened] (KAFKA-3813) Let SourceConnector implementations access the offset reader

2021-03-18 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch reopened KAFKA-3813:
--

> Let SourceConnector implementations access the offset reader
> 
>
> Key: KAFKA-3813
> URL: https://issues.apache.org/jira/browse/KAFKA-3813
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: Randall Hauch
>Priority: Major
>  Labels: needs-kip
>
> When a source connector is started, having access to the 
> {{OffsetStorageReader}} would allow it to more intelligently configure its 
> tasks based upon the stored offsets. (Currently only the {{SourceTask}} 
> implementations can access the offset reader (via the {{SourceTaskContext}}), 
> but the {{SourceConnector}} does not have access to the offset reader.)
> Of course, accessing the stored offsets is not likely to be useful for sink 
> connectors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-3813) Let SourceConnector implementations access the offset reader

2021-03-18 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-3813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304330#comment-17304330
 ] 

Randall Hauch commented on KAFKA-3813:
--

[KIP-131|https://cwiki.apache.org/confluence/display/KAFKA/KIP-131+-+Add+access+to+OffsetStorageReader+from+SourceConnector]
 was implemented with KAFKA-4794.

> Let SourceConnector implementations access the offset reader
> 
>
> Key: KAFKA-3813
> URL: https://issues.apache.org/jira/browse/KAFKA-3813
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: Randall Hauch
>Priority: Major
>  Labels: needs-kip
>
> When a source connector is started, having access to the 
> {{OffsetStorageReader}} would allow it to more intelligently configure its 
> tasks based upon the stored offsets. (Currently only the {{SourceTask}} 
> implementations can access the offset reader (via the {{SourceTaskContext}}), 
> but the {{SourceConnector}} does not have access to the offset reader.)
> Of course, accessing the stored offsets is not likely to be useful for sink 
> connectors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #10344: MINOR: Remove use of NoSuchElementException

2021-03-18 Thread GitBox


hachikuji merged pull request #10344:
URL: https://github.com/apache/kafka/pull/10344


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >