[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks
ableegoldman commented on a change in pull request #10342: URL: https://github.com/apache/kafka/pull/10342#discussion_r597424785 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -288,50 +277,31 @@ private String logPrefix() { * Get the lock for the {@link TaskId}s directory if it is available * @param taskId task id * @return true if successful - * @throws IOException if the file cannot be created or file handle cannot be grabbed, should be considered as fatal */ -synchronized boolean lock(final TaskId taskId) throws IOException { +synchronized boolean lock(final TaskId taskId) { if (!hasPersistentStores) { return true; } -final File lockFile; -// we already have the lock so bail out here -final LockAndOwner lockAndOwner = locks.get(taskId); -if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread().getName())) { -log.trace("{} Found cached state dir lock for task {}", logPrefix(), taskId); +final String lockOwner = lockedTasksToStreamThreadOwner.get(taskId); +if (lockOwner != null) { +if (lockOwner.equals(Thread.currentThread().getName())) { Review comment: Yeah I agree, I filed a ticket to improve the locking mechanism a little while back. Just wanted to keep this PR focused -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rodesai commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks
rodesai commented on a change in pull request #10342: URL: https://github.com/apache/kafka/pull/10342#discussion_r597413503 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -288,50 +277,31 @@ private String logPrefix() { * Get the lock for the {@link TaskId}s directory if it is available * @param taskId task id * @return true if successful - * @throws IOException if the file cannot be created or file handle cannot be grabbed, should be considered as fatal */ -synchronized boolean lock(final TaskId taskId) throws IOException { +synchronized boolean lock(final TaskId taskId) { if (!hasPersistentStores) { return true; } -final File lockFile; -// we already have the lock so bail out here -final LockAndOwner lockAndOwner = locks.get(taskId); -if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread().getName())) { -log.trace("{} Found cached state dir lock for task {}", logPrefix(), taskId); +final String lockOwner = lockedTasksToStreamThreadOwner.get(taskId); +if (lockOwner != null) { +if (lockOwner.equals(Thread.currentThread().getName())) { Review comment: never mind - I see this isn't a change from what was happening before. Still seems good to do, but not necessarily in this patch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rodesai commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks
rodesai commented on a change in pull request #10342: URL: https://github.com/apache/kafka/pull/10342#discussion_r597413180 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -288,50 +277,31 @@ private String logPrefix() { * Get the lock for the {@link TaskId}s directory if it is available * @param taskId task id * @return true if successful - * @throws IOException if the file cannot be created or file handle cannot be grabbed, should be considered as fatal */ -synchronized boolean lock(final TaskId taskId) throws IOException { +synchronized boolean lock(final TaskId taskId) { if (!hasPersistentStores) { return true; } -final File lockFile; -// we already have the lock so bail out here -final LockAndOwner lockAndOwner = locks.get(taskId); -if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread().getName())) { -log.trace("{} Found cached state dir lock for task {}", logPrefix(), taskId); +final String lockOwner = lockedTasksToStreamThreadOwner.get(taskId); +if (lockOwner != null) { +if (lockOwner.equals(Thread.currentThread().getName())) { Review comment: seems dangerous to use the thread name to check ownership. might be safer to check reference equality on `Thread` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12508) Emit-on-change tables may lose updates on error or restart in at_least_once
[ https://issues.apache.org/jira/browse/KAFKA-12508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Habermann updated KAFKA-12508: --- Description: [KIP-557|https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams] added emit-on-change semantics to KTables that suppress updates for duplicate values. However, this may cause data loss in at_least_once topologies when records are retried from the last commit due to an error / restart / etc. Consider the following example: {code:java} streams.table(source, materialized) .toStream() .map(mayThrow()) .to(output){code} # Record A gets read # Record A is stored in the table # The update for record A is forwarded through the topology # Map() throws (or alternatively, any restart while the forwarded update was still being processed and not yet produced to the output topic) # The stream is restarted and "retries" from the last commit # Record A gets read again # The table will discard the update for record A because ## The value is the same ## The timestamp is the same # Eventually the stream will commit # There is absolutely no output for Record A even though we're running in at_least_once This behaviour does not seem intentional. [The emit-on-change logic explicitly forwards records that have the same value and an older timestamp.|https://github.com/apache/kafka/blob/367eca083b44261d4e5fa8aa61b7990a8b35f8b0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L50] This logic should probably be changed to also forward updates that have an older *or equal* timestamp. was: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams] Added emit-on-change semantics to KTables that suppress duplicate values. However, this may cause data loss in at_least_once topologies when records are retried from the last commit due to an error / restart / etc. Consider the following example: {code:java} streams.table(source, materialized) .toStream() .map(mayThrow()) .to(output){code} # Record A gets read # Record A is stored in the table # The update for record A is forwarded through the topology # Map() throws (or alternatively, any restart while the forwarded update was still being processed and not yet produced to the output topic) # The stream is restarted and "retries" from the last commit # Record A gets read again # The table will discard the update for record A because ## The value is the same ## The timestamp is the same # Eventually the stream will commit # There is absolutely no output for Record A even though we're running in at_least_once This behaviour does not seem intentional. [The emit-on-change logic explicitly forwards records that have the same value and an older timestamp.|https://github.com/apache/kafka/blob/367eca083b44261d4e5fa8aa61b7990a8b35f8b0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L50] This logic should probably be changed to also forward updates that have an older *or equal* timestamp. > Emit-on-change tables may lose updates on error or restart in at_least_once > --- > > Key: KAFKA-12508 > URL: https://issues.apache.org/jira/browse/KAFKA-12508 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.7.0, 2.6.1 >Reporter: Nico Habermann >Priority: Major > > [KIP-557|https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams] > added emit-on-change semantics to KTables that suppress updates for > duplicate values. > However, this may cause data loss in at_least_once topologies when records > are retried from the last commit due to an error / restart / etc. > > Consider the following example: > {code:java} > streams.table(source, materialized) > .toStream() > .map(mayThrow()) > .to(output){code} > > # Record A gets read > # Record A is stored in the table > # The update for record A is forwarded through the topology > # Map() throws (or alternatively, any restart while the forwarded update was > still being processed and not yet produced to the output topic) > # The stream is restarted and "retries" from the last commit > # Record A gets read again > # The table will discard the update for record A because > ## The value is the same > ## The timestamp is the same > # Eventually the stream will commit > # There is absolutely no output for Record A even though we're running in > at_least_once > > This behaviour does not seem intentional. [The emit-on-change logic > explicitly forwards records that have the same value and an older >
[jira] [Created] (KAFKA-12508) Emit-on-change tables may lose updates on error or restart in at_least_once
Nico Habermann created KAFKA-12508: -- Summary: Emit-on-change tables may lose updates on error or restart in at_least_once Key: KAFKA-12508 URL: https://issues.apache.org/jira/browse/KAFKA-12508 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.6.1, 2.7.0 Reporter: Nico Habermann [https://cwiki.apache.org/confluence/display/KAFKA/KIP-557%3A+Add+emit+on+change+support+for+Kafka+Streams] Added emit-on-change semantics to KTables that suppress duplicate values. However, this may cause data loss in at_least_once topologies when records are retried from the last commit due to an error / restart / etc. Consider the following example: {code:java} streams.table(source, materialized) .toStream() .map(mayThrow()) .to(output){code} # Record A gets read # Record A is stored in the table # The update for record A is forwarded through the topology # Map() throws (or alternatively, any restart while the forwarded update was still being processed and not yet produced to the output topic) # The stream is restarted and "retries" from the last commit # Record A gets read again # The table will discard the update for record A because ## The value is the same ## The timestamp is the same # Eventually the stream will commit # There is absolutely no output for Record A even though we're running in at_least_once This behaviour does not seem intentional. [The emit-on-change logic explicitly forwards records that have the same value and an older timestamp.|https://github.com/apache/kafka/blob/367eca083b44261d4e5fa8aa61b7990a8b35f8b0/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueAndTimestampSerializer.java#L50] This logic should probably be changed to also forward updates that have an older *or equal* timestamp. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on pull request #10357: MINOR: KIP-500 Cluster ID is a String
cmccabe commented on pull request #10357: URL: https://github.com/apache/kafka/pull/10357#issuecomment-802556947 @vvcephei : I like @hachikuji 's idea of merging this to 2.8. It's worth noting that the protocol that we're changing here is a new one which was introduced by kip-500, and is not used in the ZK code path. So the risk to existing code should be low. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks
ableegoldman commented on a change in pull request #10342: URL: https://github.com/apache/kafka/pull/10342#discussion_r596443720 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -186,7 +186,7 @@ public ProcessorStateManager(final TaskId taskId, this.changelogReader = changelogReader; this.sourcePartitions = sourcePartitions; -this.baseDir = stateDirectory.directoryForTask(taskId); +this.baseDir = stateDirectory.getOrCreateDirectoryForTask(taskId); Review comment: Note: the only logical changes are in `StateDirectory` and `KafkaStreams`, the rest of the files were just touched by renaming this method to include the `getOrCreate` prefix. Sorry for the expanded surface area, I felt the renaming was merited since otherwise this critical functionality is easy to miss. There are also some non-renaming changes in StateManagerUtil and TaskManager that just involve removing the try-catch logic for the no longer throwable IOException. For tests, you should focus on `StateDirectoryTest` and `StateManagerUtilTest` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10563) Make sure task directories don't remain locked by dead threads
[ https://issues.apache.org/jira/browse/KAFKA-10563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304637#comment-17304637 ] A. Sophie Blee-Goldman commented on KAFKA-10563: I partially addressed this on the side in another PR:https://github.com/apache/kafka/pull/10342#issuecomment-802542776 We clean up any orphaned task directories when the cleaner thread runs. As mentioned above this is not perfect since it could mean this task directory remains blocked for up to 10 minutes (by default), which in the current architecture would also blocks progress on any other task assigned to that StreamThread. It's better than nothing but we should still follow up and make sure it's not possible to leave locked task directories behind > Make sure task directories don't remain locked by dead threads > -- > > Key: KAFKA-10563 > URL: https://issues.apache.org/jira/browse/KAFKA-10563 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > > Most common/expected exceptions within Streams are handled gracefully, and > the thread will make sure to clean up all resources such as task locks during > shutdown. However, there are some instances where an unexpected exception > such as an IllegalStateException can leave some resources orphaned. > We have seen this happen to task directories after an IllegalStateException > is hit during the TaskManager's rebalance handling logic – the Thread shuts > down, but loses track of some tasks before unlocking them. This blocks any > further work on that task by any other thread in the same instance. > Previously we decided that this was "ok" because an IllegalStateException > means all bets are off. But with the upcoming work of KIP-663 and KIP-671, > users will be able to react smartly on dying threads and replace them with > new ones, making it more important than ever to ensure that the application > can continue on with no lasting repercussions of a thread death. If we allow > users to revive/replace a thread that dies due to IllegalStateException, that > thread should not be blocked from doing any work by the ghost of its > predecessor. > It might be easiest to just add some logic to the cleanup thread to verify > all the existing locks against the list of live threads, and remove any > zombie locks. But we probably want to do this purging more frequently than > the cleanup thread runs (10min by default) – so maybe we can leverage the > work in KIP-671 and have each thread purge any locks still owned by it after > the uncaught exception handler runs, but before the thread dies. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #10342: KAFKA-12288: remove task-level filesystem locks
ableegoldman commented on pull request #10342: URL: https://github.com/apache/kafka/pull/10342#issuecomment-802542776 @wcarlson5 I ended up putting a minor guard against orphaned task directories into this PR. It's not perfect but basically we just let the cleaner thread verify that any locked task directories belong to one of the currently running StreamThreads. Let me know what you think -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10355: KAFKA-12500: fix memory leak in thread cache
ableegoldman commented on pull request #10355: URL: https://github.com/apache/kafka/pull/10355#issuecomment-802535398 Alright I added two integration tests, lmk if this looks good @vvcephei -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #10275: KAFKA-12434; Admin support for `DescribeProducers` API
chia7712 commented on a change in pull request #10275: URL: https://github.com/apache/kafka/pull/10275#discussion_r597383123 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiHandler.java ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractResponse; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +public interface AdminApiHandler { + +/** + * Get a user-friendly name for the API this handler is implementing. + */ +String apiName(); + +/** + * Initialize the set of keys required to handle this API and how the driver + * should map them to the broker that will handle the request for these keys. + * + * Two mapping types are supported: + * + * - Static mapping: when the brokerId is known ahead of time + * - Dynamic mapping: when the brokerId must be discovered dynamically + * + * @return the key mappings + */ +KeyMappings initializeKeys(); + +/** + * Build the fulfillment request. The set of keys are derived during the Lookup stage + * as the set of keys which all map to the same destination broker. + * + * @param brokerId the target brokerId for the request + * @param keys the set of keys that should be handled by this request + * + * @return a builder for the request containing the given keys + */ +AbstractRequest.Builder buildRequest(Integer brokerId, Set keys); + +/** + * Callback that is invoked when a Fulfillment request returns successfully. + * The handler should parse the response, check for errors, and return a + * result which indicates which keys (if any) have either been completed or + * failed with an unrecoverable error. + * + * It is also possible that the response indicates an incorrect target brokerId + * (e.g. in the case of a NotLeader error when the request is bound for a partition + * leader). In this case the key will be "unmapped" from the target brokerId + * and lookup will be retried. + * + * Note that keys which received a retriable error should be left out of the + * result. They will be retried automatically. + * + * @param brokerId the brokerId that the associated request was sent to + * @param keys the set of keys from the associated request + * @param response the response received from the broker + * + * @return result indicating key complation, failure, and unmapping + */ +ApiResult handleResponse(Integer brokerId, Set keys, AbstractResponse response); + +class KeyMappings { +public final Optional> staticMapping; Review comment: Do you plan to add more fields to `StaticKeyMapping` and `DynamicKeyMapping` in the future? If not, it seems to me we can simplify the data structure. Empty staticMapping and staticMapping with empty map are identical in processing. Could we simplify this structure? for example, a pure map? Also, How about renaming `keys` (of `DynamicKeyMapping`) to `dynamicKeys` and then moving both variables to `KeyMappings`? ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java ## @@ -0,0 +1,473 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License
[jira] [Commented] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor
[ https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304611#comment-17304611 ] Luke Chen commented on KAFKA-12495: --- [~ramkrish1489], I checked your PR in KAFKA-10413 again, and I modified my bug description now. You fixed the issue when member left, and you did a round-robin distribution to all the active members. Nice fix! The issue I tried to fix here, is when new member joined, we'll have 2 rounds of rebalance for it: 1 for revocation, 1 for re-assignment. And the issue is at the 2nd round of re-assignment. What we did now is re-assign all the previous revoked C/T to the new members, and end the rebalance. However, if there is 1 (or more) members joined during the 1st round and 2nd round of rebalance, at the 2nd round of re-assignment, we'll assign all the revoked C/T to all new members, which should be only re-assigned to the "numberOfNewMember - 1" members. You can check the example in the bug description again, you should understand what I meant. Thank you. > Unbalanced connectors/tasks distribution will happen in Connect's incremental > cooperative assignor > -- > > Key: KAFKA-12495 > URL: https://issues.apache.org/jira/browse/KAFKA-12495 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Luke Chen >Priority: Major > Attachments: image-2021-03-18-15-04-57-854.png, > image-2021-03-18-15-05-52-557.png, image-2021-03-18-15-07-27-103.png > > > In Kafka Connect, we implement incremental cooperative rebalance algorithm > based on KIP-415 > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]. > However, we have a bad assumption in the algorithm implementation, which is: > after revoking rebalance completed, the member(worker) count will be the same > as the previous round of reblance. > > Let's take a look at the example in the KIP-415: > !image-2021-03-18-15-07-27-103.png|width=441,height=556! > It works well for most cases. But what if W4 added after 1st rebalance > completed and before 2nd rebalance started? Let's see what will happened? > Let's see this example: (we'll use 10 tasks here): > > {code:java} > Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, > BT5 > W1 is current leader > W2 joins with assignment: [] > Rebalance is triggered > W3 joins while rebalance is still active with assignment: [] > W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, > BT4, BT5] > W1 becomes leader > W1 computes and sends assignments: > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, > BT2, BT4, BT4, BT5]) > W2(delay: 0, assigned: [], revoked: []) > W3(delay: 0, assigned: [], revoked: []) > W1 stops revoked resources > W1 rejoins with assignment: [AC0, AT1, AT2, AT3] > Rebalance is triggered > W2 joins with assignment: [] > W3 joins with assignment: [] > // one more member joined > W4 joins with assignment: [] > W1 becomes leader > W1 computes and sends assignments: > // We assigned all the previous revoked Connectors/Tasks to the new member, > which cause unbalanced distribution > W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) > W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) > W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) > W2(delay: 0, assigned: [BT4, BT5], revoked: []) > {code} > We cannot just assign the revoked C/T to the new members in the 2nd round of > rebalance. We should check if we need one more round of revocation. > > Note: The consumer's cooperative sticky assignor won't have this issue since > we re-compute the assignment in each round. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman edited a comment on pull request #10356: KAFKA-12503: cache size set by the thread
ableegoldman edited a comment on pull request #10356: URL: https://github.com/apache/kafka/pull/10356#issuecomment-802491798 I don't think we should aim to catch things in soak, although I'm not on the side of "we should never catch anything in soak/everything must have an integration test" either. It's just that this particular issue, and this feature in general really, calls for an integration test -- we should be flexing the cache resize codepath in a realistic scenario. I'm happy to leave that as followup work as long as we do get around to it. This looks good and the build passed so I'll merge this to unblock the release -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12495) Unbalanced connectors/tasks distribution will happen in Connect's incremental cooperative assignor
[ https://issues.apache.org/jira/browse/KAFKA-12495?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen updated KAFKA-12495: -- Description: In Kafka Connect, we implement incremental cooperative rebalance algorithm based on KIP-415 ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]. However, we have a bad assumption in the algorithm implementation, which is: after revoking rebalance completed, the member(worker) count will be the same as the previous round of reblance. Let's take a look at the example in the KIP-415: !image-2021-03-18-15-07-27-103.png|width=441,height=556! It works well for most cases. But what if W4 added after 1st rebalance completed and before 2nd rebalance started? Let's see what will happened? Let's see this example: (we'll use 10 tasks here): {code:java} Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5 W1 is current leader W2 joins with assignment: [] Rebalance is triggered W3 joins while rebalance is still active with assignment: [] W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5] W1 becomes leader W1 computes and sends assignments: W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) W2(delay: 0, assigned: [], revoked: []) W3(delay: 0, assigned: [], revoked: []) W1 stops revoked resources W1 rejoins with assignment: [AC0, AT1, AT2, AT3] Rebalance is triggered W2 joins with assignment: [] W3 joins with assignment: [] // one more member joined W4 joins with assignment: [] W1 becomes leader W1 computes and sends assignments: // We assigned all the previous revoked Connectors/Tasks to the new member, which cause unbalanced distribution W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) W2(delay: 0, assigned: [AT4, AT5, BC0], revoked: []) W2(delay: 0, assigned: [BT1, BT2, BT4], revoked: []) W2(delay: 0, assigned: [BT4, BT5], revoked: []) {code} We cannot just assign the revoked C/T to the new members in the 2nd round of rebalance. We should check if we need one more round of revocation. Note: The consumer's cooperative sticky assignor won't have this issue since we re-compute the assignment in each round. was: In Kafka Connect, we implement incremental cooperative rebalance algorithm based on KIP-415 ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect]. However, we have a bad assumption in the algorithm implementation, which is: after revoking rebalance completed, the member(worker) count will be the same as the previous round of reblance. Let's take a look at the example in the KIP-415: !image-2021-03-18-15-07-27-103.png|width=441,height=556! It works well for most cases. But what if W3 left after 1st rebalance completed and before 2nd rebalance started? Let's see what will happened? Let's see this example: (we'll use 10 tasks here): {code:java} Initial group and assignment: W1([AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) Config topic contains: AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5 W1 is current leader W2 joins with assignment: [] Rebalance is triggered W3 joins while rebalance is still active with assignment: [] W1 joins with assignment: [AC0, AT1, AT2, AT3, AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5] W1 becomes leader W1 computes and sends assignments: W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5]) W2(delay: 0, assigned: [], revoked: []) W3(delay: 0, assigned: [], revoked: []) W1 stops revoked resources W1 rejoins with assignment: [AC0, AT1, AT2, AT3] Rebalance is triggered W2 joins with assignment: [] // W3 is down W3 doesn't join W1 becomes leader W1 computes and sends assignments: // We assigned all the previous revoked Connectors/Tasks to the new member, which cause unbalanced distribution W1(delay: 0, assigned: [AC0, AT1, AT2, AT3], revoked: []) W2(delay: 0, assigned: [AT4, AT5, BC0, BT1, BT2, BT4, BT4, BT5], revoked: []) {code} We cannot assume the member count after keeps the same right after revocation. Note: The consumer's cooperative sticky assignor won't have this issue since we re-compute the assignment in each round. > Unbalanced connectors/tasks distribution will happen in Connect's incremental > cooperative assignor > -- > > Key: KAFKA-12495 > URL:
[jira] [Comment Edited] (KAFKA-8608) Broker shows WARN on reassignment partitions on new brokers: Replica LEO, follower position & Cache truncation
[ https://issues.apache.org/jira/browse/KAFKA-8608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304601#comment-17304601 ] Zhang Jianguo edited comment on KAFKA-8608 at 3/19/21, 2:54 AM: [~LillianY] [~xmar] I meet the same issue. [2021-03-18 17:37:30,799] WARN [ReplicaManager broker=15] Leader 15 failed to record follower 18's position 308399, and last sent HW since the replica is not recognized to be one of the assigned replicas 15,16 for partition Collect-gnodeb-24. Empty records will be returned for this partition. (kafka.server.ReplicaManager) After controller switched from 14 to 15, it looks like kafka became abnormal. It didn't work even if I restart brokers. *Logs of Broker 14* !image-2021-03-19-10-36-04-328.png! !image-2021-03-19-10-41-44-952.png! !image-2021-03-19-10-42-16-296.png! !image-2021-03-19-10-42-32-759.png! *producer LOG* !image-2021-03-19-10-41-03-203.png! *Consumer got timeout exception:* *!image-2021-03-19-10-39-24-728.png!* was (Author: alberyzjg): [~LillianY] I meet the same issue. [2021-03-18 17:37:30,799] WARN [ReplicaManager broker=15] Leader 15 failed to record follower 18's position 308399, and last sent HW since the replica is not recognized to be one of the assigned replicas 15,16 for partition Collect-gnodeb-24. Empty records will be returned for this partition. (kafka.server.ReplicaManager) After controller switched from 14 to 15, it looks like kafka became abnormal. It didn't work even if I restart brokers. *Logs of Broker 14* !image-2021-03-19-10-36-04-328.png! !image-2021-03-19-10-41-44-952.png! !image-2021-03-19-10-42-16-296.png! !image-2021-03-19-10-42-32-759.png! *producer LOG* !image-2021-03-19-10-41-03-203.png! *Consumer got timeout exception:* *!image-2021-03-19-10-39-24-728.png!* > Broker shows WARN on reassignment partitions on new brokers: Replica LEO, > follower position & Cache truncation > -- > > Key: KAFKA-8608 > URL: https://issues.apache.org/jira/browse/KAFKA-8608 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.1.1 > Environment: Kafka 2.1.1 >Reporter: Di Campo >Priority: Minor > Labels: broker, reassign, repartition > Attachments: image-2021-03-19-10-36-04-328.png, > image-2021-03-19-10-39-24-728.png, image-2021-03-19-10-41-03-203.png, > image-2021-03-19-10-41-44-952.png, image-2021-03-19-10-42-16-296.png, > image-2021-03-19-10-42-32-759.png > > > I added two brokers (brokerId 4,5) to a 3-node (brokerId 1,2,3) cluster where > there were 32 topics and 64 partitions on each, replication 3. > Running reassigning partitions. > On each run, I can see the following WARN messages, but when the reassignment > partition process finishes, it all seems OK. ISR is OK (count is 3 in every > partition). > But I get the following messages types, one per partition: > > {code:java} > [2019-06-27 12:42:03,946] WARN [LeaderEpochCache visitors-0.0.1-10] New epoch > entry EpochEntry(epoch=24, startOffset=51540) caused truncation of > conflicting entries ListBuffer(EpochEntry(epoch=22, startOffset=51540)). > Cache now contains 5 entries. (kafka.server.epoch.LeaderEpochFileCache) {code} > -> This relates to cache, so I suppose it's pretty safe. > {code:java} > [2019-06-27 12:42:04,250] WARN [ReplicaManager broker=1] Leader 1 failed to > record follower 3's position 47981 since the replica is not recognized to be > one of the assigned replicas 1,2,5 for partition visitors-0.0.1-28. Empty > records will be returned for this partition. > (kafka.server.ReplicaManager){code} > -> This is scary. I'm not sure about the severity of this, but it looks like > it may be missing records? > {code:java} > [2019-06-27 12:42:03,709] WARN [ReplicaManager broker=1] While recording the > replica LEO, the partition visitors-0.0.1-58 hasn't been created. > (kafka.server.ReplicaManager){code} > -> Here, these partitions are created. > First of all - am I supposed to be missing data here? I am assuming I don't, > and so far I don't see traces of losing anything. > If so, I'm not sure what these messages are trying to say here. Should they > really be at WARN level? If so - should the message clarify better the > different risks involved? > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-8608) Broker shows WARN on reassignment partitions on new brokers: Replica LEO, follower position & Cache truncation
[ https://issues.apache.org/jira/browse/KAFKA-8608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304601#comment-17304601 ] Zhang Jianguo edited comment on KAFKA-8608 at 3/19/21, 2:42 AM: [~LillianY] I meet the same issue. [2021-03-18 17:37:30,799] WARN [ReplicaManager broker=15] Leader 15 failed to record follower 18's position 308399, and last sent HW since the replica is not recognized to be one of the assigned replicas 15,16 for partition Collect-gnodeb-24. Empty records will be returned for this partition. (kafka.server.ReplicaManager) After controller switched from 14 to 15, it looks like kafka became abnormal. It didn't work even if I restart brokers. *Logs of Broker 14* !image-2021-03-19-10-36-04-328.png! !image-2021-03-19-10-41-44-952.png! !image-2021-03-19-10-42-16-296.png! !image-2021-03-19-10-42-32-759.png! *producer LOG* !image-2021-03-19-10-41-03-203.png! *Consumer got timeout exception:* *!image-2021-03-19-10-39-24-728.png!* was (Author: alberyzjg): [~LillianY] I meet the same issue. [2021-03-18 17:37:30,799] WARN [ReplicaManager broker=15] Leader 15 failed to record follower 18's position 308399, and last sent HW since the replica is not recognized to be one of the assigned replicas 15,16 for partition Collect-gnodeb-24. Empty records will be returned for this partition. (kafka.server.ReplicaManager) After controller switched from 14 to 15, it looks like kafka became abnormal. It didn't work even if I restart brokers. !image-2021-03-19-10-36-04-328.png! !image-2021-03-19-10-37-35-183.png! !image-2021-03-19-10-38-11-280.png! !image-2021-03-19-10-38-22-154.png! *producer LOG* *!image-2021-03-19-10-38-38-396.png!* *Consumer got timeout exception:* *!image-2021-03-19-10-39-24-728.png!* > Broker shows WARN on reassignment partitions on new brokers: Replica LEO, > follower position & Cache truncation > -- > > Key: KAFKA-8608 > URL: https://issues.apache.org/jira/browse/KAFKA-8608 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.1.1 > Environment: Kafka 2.1.1 >Reporter: Di Campo >Priority: Minor > Labels: broker, reassign, repartition > Attachments: image-2021-03-19-10-36-04-328.png, > image-2021-03-19-10-39-24-728.png, image-2021-03-19-10-41-03-203.png, > image-2021-03-19-10-41-44-952.png, image-2021-03-19-10-42-16-296.png, > image-2021-03-19-10-42-32-759.png > > > I added two brokers (brokerId 4,5) to a 3-node (brokerId 1,2,3) cluster where > there were 32 topics and 64 partitions on each, replication 3. > Running reassigning partitions. > On each run, I can see the following WARN messages, but when the reassignment > partition process finishes, it all seems OK. ISR is OK (count is 3 in every > partition). > But I get the following messages types, one per partition: > > {code:java} > [2019-06-27 12:42:03,946] WARN [LeaderEpochCache visitors-0.0.1-10] New epoch > entry EpochEntry(epoch=24, startOffset=51540) caused truncation of > conflicting entries ListBuffer(EpochEntry(epoch=22, startOffset=51540)). > Cache now contains 5 entries. (kafka.server.epoch.LeaderEpochFileCache) {code} > -> This relates to cache, so I suppose it's pretty safe. > {code:java} > [2019-06-27 12:42:04,250] WARN [ReplicaManager broker=1] Leader 1 failed to > record follower 3's position 47981 since the replica is not recognized to be > one of the assigned replicas 1,2,5 for partition visitors-0.0.1-28. Empty > records will be returned for this partition. > (kafka.server.ReplicaManager){code} > -> This is scary. I'm not sure about the severity of this, but it looks like > it may be missing records? > {code:java} > [2019-06-27 12:42:03,709] WARN [ReplicaManager broker=1] While recording the > replica LEO, the partition visitors-0.0.1-58 hasn't been created. > (kafka.server.ReplicaManager){code} > -> Here, these partitions are created. > First of all - am I supposed to be missing data here? I am assuming I don't, > and so far I don't see traces of losing anything. > If so, I'm not sure what these messages are trying to say here. Should they > really be at WARN level? If so - should the message clarify better the > different risks involved? > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8608) Broker shows WARN on reassignment partitions on new brokers: Replica LEO, follower position & Cache truncation
[ https://issues.apache.org/jira/browse/KAFKA-8608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304601#comment-17304601 ] Zhang Jianguo commented on KAFKA-8608: -- [~LillianY] I meet the same issue. [2021-03-18 17:37:30,799] WARN [ReplicaManager broker=15] Leader 15 failed to record follower 18's position 308399, and last sent HW since the replica is not recognized to be one of the assigned replicas 15,16 for partition Collect-gnodeb-24. Empty records will be returned for this partition. (kafka.server.ReplicaManager) After controller switched from 14 to 15, it looks like kafka became abnormal. It didn't work even if I restart brokers. !image-2021-03-19-10-36-04-328.png! !image-2021-03-19-10-37-35-183.png! !image-2021-03-19-10-38-11-280.png! !image-2021-03-19-10-38-22-154.png! *producer LOG* *!image-2021-03-19-10-38-38-396.png!* *Consumer got timeout exception:* *!image-2021-03-19-10-39-24-728.png!* > Broker shows WARN on reassignment partitions on new brokers: Replica LEO, > follower position & Cache truncation > -- > > Key: KAFKA-8608 > URL: https://issues.apache.org/jira/browse/KAFKA-8608 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.1.1 > Environment: Kafka 2.1.1 >Reporter: Di Campo >Priority: Minor > Labels: broker, reassign, repartition > Attachments: image-2021-03-19-10-36-04-328.png, > image-2021-03-19-10-39-24-728.png > > > I added two brokers (brokerId 4,5) to a 3-node (brokerId 1,2,3) cluster where > there were 32 topics and 64 partitions on each, replication 3. > Running reassigning partitions. > On each run, I can see the following WARN messages, but when the reassignment > partition process finishes, it all seems OK. ISR is OK (count is 3 in every > partition). > But I get the following messages types, one per partition: > > {code:java} > [2019-06-27 12:42:03,946] WARN [LeaderEpochCache visitors-0.0.1-10] New epoch > entry EpochEntry(epoch=24, startOffset=51540) caused truncation of > conflicting entries ListBuffer(EpochEntry(epoch=22, startOffset=51540)). > Cache now contains 5 entries. (kafka.server.epoch.LeaderEpochFileCache) {code} > -> This relates to cache, so I suppose it's pretty safe. > {code:java} > [2019-06-27 12:42:04,250] WARN [ReplicaManager broker=1] Leader 1 failed to > record follower 3's position 47981 since the replica is not recognized to be > one of the assigned replicas 1,2,5 for partition visitors-0.0.1-28. Empty > records will be returned for this partition. > (kafka.server.ReplicaManager){code} > -> This is scary. I'm not sure about the severity of this, but it looks like > it may be missing records? > {code:java} > [2019-06-27 12:42:03,709] WARN [ReplicaManager broker=1] While recording the > replica LEO, the partition visitors-0.0.1-58 hasn't been created. > (kafka.server.ReplicaManager){code} > -> Here, these partitions are created. > First of all - am I supposed to be missing data here? I am assuming I don't, > and so far I don't see traces of losing anything. > If so, I'm not sure what these messages are trying to say here. Should they > really be at WARN level? If so - should the message clarify better the > different risks involved? > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-8608) Broker shows WARN on reassignment partitions on new brokers: Replica LEO, follower position & Cache truncation
[ https://issues.apache.org/jira/browse/KAFKA-8608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhang Jianguo updated KAFKA-8608: - Attachment: image-2021-03-19-10-39-24-728.png > Broker shows WARN on reassignment partitions on new brokers: Replica LEO, > follower position & Cache truncation > -- > > Key: KAFKA-8608 > URL: https://issues.apache.org/jira/browse/KAFKA-8608 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.1.1 > Environment: Kafka 2.1.1 >Reporter: Di Campo >Priority: Minor > Labels: broker, reassign, repartition > Attachments: image-2021-03-19-10-36-04-328.png, > image-2021-03-19-10-39-24-728.png > > > I added two brokers (brokerId 4,5) to a 3-node (brokerId 1,2,3) cluster where > there were 32 topics and 64 partitions on each, replication 3. > Running reassigning partitions. > On each run, I can see the following WARN messages, but when the reassignment > partition process finishes, it all seems OK. ISR is OK (count is 3 in every > partition). > But I get the following messages types, one per partition: > > {code:java} > [2019-06-27 12:42:03,946] WARN [LeaderEpochCache visitors-0.0.1-10] New epoch > entry EpochEntry(epoch=24, startOffset=51540) caused truncation of > conflicting entries ListBuffer(EpochEntry(epoch=22, startOffset=51540)). > Cache now contains 5 entries. (kafka.server.epoch.LeaderEpochFileCache) {code} > -> This relates to cache, so I suppose it's pretty safe. > {code:java} > [2019-06-27 12:42:04,250] WARN [ReplicaManager broker=1] Leader 1 failed to > record follower 3's position 47981 since the replica is not recognized to be > one of the assigned replicas 1,2,5 for partition visitors-0.0.1-28. Empty > records will be returned for this partition. > (kafka.server.ReplicaManager){code} > -> This is scary. I'm not sure about the severity of this, but it looks like > it may be missing records? > {code:java} > [2019-06-27 12:42:03,709] WARN [ReplicaManager broker=1] While recording the > replica LEO, the partition visitors-0.0.1-58 hasn't been created. > (kafka.server.ReplicaManager){code} > -> Here, these partitions are created. > First of all - am I supposed to be missing data here? I am assuming I don't, > and so far I don't see traces of losing anything. > If so, I'm not sure what these messages are trying to say here. Should they > really be at WARN level? If so - should the message clarify better the > different risks involved? > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12503) Resizing the thread cache in a non thread safe way can cause records to be redirected throughout the topology
[ https://issues.apache.org/jira/browse/KAFKA-12503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman resolved KAFKA-12503. Resolution: Fixed > Resizing the thread cache in a non thread safe way can cause records to be > redirected throughout the topology > - > > Key: KAFKA-12503 > URL: https://issues.apache.org/jira/browse/KAFKA-12503 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Walker Carlson >Assignee: Walker Carlson >Priority: Blocker > Fix For: 2.8.0 > > > When a thread is added, removed or replaced the cache is resized. When the > thread cache was resized it was being done so from the thread initiating > these calls. This can cause the record to be redirected to the wrong > processor via the call to `evict` in the cache. The evict flushes records > downstream to the next processor after the cache. But if this is on the wrong > thread the wrong processor receives them. > This can cause 3 problems. > 1) When the owner finishes processing the record it set the current node to > null in the processor context a this then causes the other processor to throw > an exception `StreamsException: Current node is unknown.`. > 2) Depending on the type it can cause a class cast exception as the record is > a different type. Mostly this happened when the value types were different > inside of the map node from the toStream method > 3) A silent issue is it could cause data to be processed by the wrong node > and cause data corruption. We have not been able to confirm this last one but > it is the most dangerous in many ways. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-8608) Broker shows WARN on reassignment partitions on new brokers: Replica LEO, follower position & Cache truncation
[ https://issues.apache.org/jira/browse/KAFKA-8608?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zhang Jianguo updated KAFKA-8608: - Attachment: image-2021-03-19-10-36-04-328.png > Broker shows WARN on reassignment partitions on new brokers: Replica LEO, > follower position & Cache truncation > -- > > Key: KAFKA-8608 > URL: https://issues.apache.org/jira/browse/KAFKA-8608 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.1.1 > Environment: Kafka 2.1.1 >Reporter: Di Campo >Priority: Minor > Labels: broker, reassign, repartition > Attachments: image-2021-03-19-10-36-04-328.png > > > I added two brokers (brokerId 4,5) to a 3-node (brokerId 1,2,3) cluster where > there were 32 topics and 64 partitions on each, replication 3. > Running reassigning partitions. > On each run, I can see the following WARN messages, but when the reassignment > partition process finishes, it all seems OK. ISR is OK (count is 3 in every > partition). > But I get the following messages types, one per partition: > > {code:java} > [2019-06-27 12:42:03,946] WARN [LeaderEpochCache visitors-0.0.1-10] New epoch > entry EpochEntry(epoch=24, startOffset=51540) caused truncation of > conflicting entries ListBuffer(EpochEntry(epoch=22, startOffset=51540)). > Cache now contains 5 entries. (kafka.server.epoch.LeaderEpochFileCache) {code} > -> This relates to cache, so I suppose it's pretty safe. > {code:java} > [2019-06-27 12:42:04,250] WARN [ReplicaManager broker=1] Leader 1 failed to > record follower 3's position 47981 since the replica is not recognized to be > one of the assigned replicas 1,2,5 for partition visitors-0.0.1-28. Empty > records will be returned for this partition. > (kafka.server.ReplicaManager){code} > -> This is scary. I'm not sure about the severity of this, but it looks like > it may be missing records? > {code:java} > [2019-06-27 12:42:03,709] WARN [ReplicaManager broker=1] While recording the > replica LEO, the partition visitors-0.0.1-58 hasn't been created. > (kafka.server.ReplicaManager){code} > -> Here, these partitions are created. > First of all - am I supposed to be missing data here? I am assuming I don't, > and so far I don't see traces of losing anything. > If so, I'm not sure what these messages are trying to say here. Should they > really be at WARN level? If so - should the message clarify better the > different risks involved? > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #10356: KAFKA-12503: cache size set by the thread
ableegoldman commented on pull request #10356: URL: https://github.com/apache/kafka/pull/10356#issuecomment-802493975 Merged and cherrypicked to 2.8 cc @vvcephei -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman merged pull request #10356: KAFKA-12503: cache size set by the thread
ableegoldman merged pull request #10356: URL: https://github.com/apache/kafka/pull/10356 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10356: KAFKA-12503: cache size set by the thread
ableegoldman commented on pull request #10356: URL: https://github.com/apache/kafka/pull/10356#issuecomment-802491798 I don't think we should aim to catch things in soak, although I'm not on the side of "we should never catch anything in soak/everything must have an integration test" either. This particular issue does seem appropriate for an integration test -- as I said concurrency bugs are hard to actually reproduce, but we should absolutely be flexing the cache resize codepath in an integration test. I'm happy to leave that as followup work as long as we do get around to it. This looks good and the build passed so I'll merge this to unblock the release -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12507) java.lang.OutOfMemoryError: Direct buffer memory
[ https://issues.apache.org/jira/browse/KAFKA-12507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304600#comment-17304600 ] diehu commented on KAFKA-12507: --- We did not set the -XX:MaxDirectMemorySize, and its default value is equal to -Xmx=1G. In server.properties, we set num.network.threads=16, num.io.threads=6, and the socket.request.max.bytes is not set. > java.lang.OutOfMemoryError: Direct buffer memory > > > Key: KAFKA-12507 > URL: https://issues.apache.org/jira/browse/KAFKA-12507 > Project: Kafka > Issue Type: Bug > Components: core > Environment: kafka version: 2.0.1 > java version: > java version "1.8.0_211" > Java(TM) SE Runtime Environment (build 1.8.0_211-b12) > Java HotSpot(TM) 64-Bit Server VM (build 25.211-b12, mixed mode) > the command we use to start kafka broker: > java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 > -XX:InitiatingHeapOccupancyPercent=35 -Djava.awt.headless=true > -XX:+ExplicitGCInvokesConcurrent >Reporter: diehu >Priority: Major > > Hi, we have three brokers in our kafka cluster, and we use scripts to send > data to kafka at a rate of about 3.6w eps. After about one month, we got the > OOM error: > {code:java} > [2021-01-09 17:12:24,750] ERROR Processor got uncaught exception. > (kafka.network.Processor) > java.lang.OutOfMemoryError: Direct buffer memory > at java.nio.Bits.reserveMemory(Bits.java:694) > at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) > at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) > at sun.nio.ch.IOUtil.read(IOUtil.java:195) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) > at > org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:104) > at > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117) > at > org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:335) > at > org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:296) > at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:562) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:498) > at org.apache.kafka.common.network.Selector.poll(Selector.java:427) > at kafka.network.Processor.poll(SocketServer.scala:679) > at kafka.network.Processor.run(SocketServer.scala:584) > at java.lang.Thread.run(Thread.java:748){code} > the kafka server is not shutdown, but always get this error. And at the same > time, data can not be produced to kafka cluster, consumer can not consume > data from kafka cluster. > We used the recommended java parameter XX:+ExplicitGCInvokesConcurrent but > it seems not useful. > Only kafka cluster restart helps to fix this problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12507) java.lang.OutOfMemoryError: Direct buffer memory
[ https://issues.apache.org/jira/browse/KAFKA-12507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304593#comment-17304593 ] Wenbing Shen commented on KAFKA-12507: -- How much do you set about these parameters: -XX:MaxDirectMemorySize、num.network.threads、socket.request.max.bytes We have encountered the same problem in the environment of our customers. We set -XX:MaxDirectMemorySize=2G 、num.network.threads=120,Later, I circumvented this problem by calculating real-time traffic and reducing the num.network.threads. > java.lang.OutOfMemoryError: Direct buffer memory > > > Key: KAFKA-12507 > URL: https://issues.apache.org/jira/browse/KAFKA-12507 > Project: Kafka > Issue Type: Bug > Components: core > Environment: kafka version: 2.0.1 > java version: > java version "1.8.0_211" > Java(TM) SE Runtime Environment (build 1.8.0_211-b12) > Java HotSpot(TM) 64-Bit Server VM (build 25.211-b12, mixed mode) > the command we use to start kafka broker: > java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 > -XX:InitiatingHeapOccupancyPercent=35 -Djava.awt.headless=true > -XX:+ExplicitGCInvokesConcurrent >Reporter: diehu >Priority: Major > > Hi, we have three brokers in our kafka cluster, and we use scripts to send > data to kafka at a rate of about 3.6w eps. After about one month, we got the > OOM error: > {code:java} > [2021-01-09 17:12:24,750] ERROR Processor got uncaught exception. > (kafka.network.Processor) > java.lang.OutOfMemoryError: Direct buffer memory > at java.nio.Bits.reserveMemory(Bits.java:694) > at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) > at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) > at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) > at sun.nio.ch.IOUtil.read(IOUtil.java:195) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) > at > org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:104) > at > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117) > at > org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:335) > at > org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:296) > at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:562) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:498) > at org.apache.kafka.common.network.Selector.poll(Selector.java:427) > at kafka.network.Processor.poll(SocketServer.scala:679) > at kafka.network.Processor.run(SocketServer.scala:584) > at java.lang.Thread.run(Thread.java:748){code} > the kafka server is not shutdown, but always get this error. And at the same > time, data can not be produced to kafka cluster, consumer can not consume > data from kafka cluster. > We used the recommended java parameter XX:+ExplicitGCInvokesConcurrent but > it seems not useful. > Only kafka cluster restart helps to fix this problem. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12478) Consumer group may lose data for newly expanded partitions when add partitions for topic if the group is set to consume from the latest
[ https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304592#comment-17304592 ] hudeqi commented on KAFKA-12478: Thank you very much for your reply! I belong to Kuaishou Message-oriented middleware group, which is mainly responsible for the secondary development and personalized customization of kafka. Kafka is widely used throughout the company, and the case proposed by the issue was recently discovered by a business partner who is very sensitive to data. This shocked us, the company has a large number of topic, and add-partition is a relatively high-frequency operation, but a considerable part of business uses latest parameters. If the consumer client perceives the expansion lagging behind the producer client, data will be definitely lost. As a storage middleware, losing data must be a serious problem. *Although this problem can be avoided by config earliest, but it is not elegant, and the company uses clients in many other languages, such as rdkafka,go,python, etc. We expect to be transparent to the client without losing data, and if the amount of topic data is large. "earliest" may also put some pressure on the kafka servers, so we want to optimize the server logic to nearly completely solve this case.* Looking forward to your reply! > Consumer group may lose data for newly expanded partitions when add > partitions for topic if the group is set to consume from the latest > --- > > Key: KAFKA-12478 > URL: https://issues.apache.org/jira/browse/KAFKA-12478 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.7.0 >Reporter: hudeqi >Priority: Blocker > Labels: patch > Original Estimate: 1,158h > Remaining Estimate: 1,158h > > This problem is exposed in our product environment: a topic is used to > produce monitoring data. *After expanding partitions, the consumer side of > the business reported that the data is lost.* > After preliminary investigation, the lost data is all concentrated in the > newly expanded partitions. The reason is: when the server expands, the > producer firstly perceives the expansion, and some data is written in the > newly expanded partitions. But the consumer group perceives the expansion > later, after the rebalance is completed, the newly expanded partitions will > be consumed from the latest if it is set to consume from the latest. Within a > period of time, the data of the newly expanded partitions is skipped and lost > by the consumer. > If it is not necessarily set to consume from the earliest for a huge data > flow topic when starts up, this will make the group consume historical data > from the broker crazily, which will affect the performance of brokers to a > certain extent. Therefore, *it is necessary to consume these partitions from > the earliest separately.* -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Issue Comment Deleted] (KAFKA-12478) Consumer group may lose data for newly expanded partitions when add partitions for topic if the group is set to consume from the latest
[ https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] hudeqi updated KAFKA-12478: --- Comment: was deleted (was: Thank you very much for your reply! I belong to Kuaishou Message-oriented middleware group, which is mainly responsible for the secondary development and personalized customization of kafka. Kafka is widely used throughout the company, and the case proposed by the issue was recently discovered by a business partner who is very sensitive to data. This shocked us, the company has a large number of topic, and add-partition is a relatively high-frequency operation, but a considerable part of business uses latest parameters. If the consumer client perceives the expansion lagging behind the producer client, data will be definitely lost. As a storage middleware, losing data must be a serious problem. *Although this problem can be avoided by config earliest, but it is not elegant, and the company uses clients in many other languages, such as rdkafka,go,python, etc. We expect to be transparent to the client without losing data, and if the amount of topic data is large. "earliest" may also put some pressure on the kafka servers, so we want to optimize the server logic to nearly completely solve this case.* Looking forward to your reply!) > Consumer group may lose data for newly expanded partitions when add > partitions for topic if the group is set to consume from the latest > --- > > Key: KAFKA-12478 > URL: https://issues.apache.org/jira/browse/KAFKA-12478 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.7.0 >Reporter: hudeqi >Priority: Blocker > Labels: patch > Original Estimate: 1,158h > Remaining Estimate: 1,158h > > This problem is exposed in our product environment: a topic is used to > produce monitoring data. *After expanding partitions, the consumer side of > the business reported that the data is lost.* > After preliminary investigation, the lost data is all concentrated in the > newly expanded partitions. The reason is: when the server expands, the > producer firstly perceives the expansion, and some data is written in the > newly expanded partitions. But the consumer group perceives the expansion > later, after the rebalance is completed, the newly expanded partitions will > be consumed from the latest if it is set to consume from the latest. Within a > period of time, the data of the newly expanded partitions is skipped and lost > by the consumer. > If it is not necessarily set to consume from the earliest for a huge data > flow topic when starts up, this will make the group consume historical data > from the broker crazily, which will affect the performance of brokers to a > certain extent. Therefore, *it is necessary to consume these partitions from > the earliest separately.* -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-12478) Consumer group may lose data for newly expanded partitions when add partitions for topic if the group is set to consume from the latest
[ https://issues.apache.org/jira/browse/KAFKA-12478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304285#comment-17304285 ] hudeqi edited comment on KAFKA-12478 at 3/19/21, 2:16 AM: -- Thank you very much for your reply! I belong to Kuaishou Message-oriented middleware group, which is mainly responsible for the secondary development and personalized customization of kafka. Kafka is widely used throughout the company, and the case proposed by the issue was recently discovered by a business partner who is very sensitive to data. This shocked us, the company has a large number of topic, and add-partition is a relatively high-frequency operation, but a considerable part of business uses latest parameters. If the consumer client perceives the expansion lagging behind the producer client, data will be definitely lost. As a storage middleware, losing data must be a serious problem. *Although this problem can be avoided by config earliest, but it is not elegant, and the company uses clients in many other languages, such as rdkafka,go,python, etc. We expect to be transparent to the client without losing data, and if the amount of topic data is large. "earliest" may also put some pressure on the kafka servers, so we want to optimize the server logic to nearly completely solve this case.* Looking forward to your reply! was (Author: hudeqi): Thank you very much for your reply! I belong to Kuaishou Message-oriented middleware group, which is mainly responsible for the secondary development and personalized customization of kafka. Kafka is widely used throughout the company, and the case proposed by the issue was recently discovered by a business partner who is very sensitive to data. This shocked us, the company has a large number of topic, and add-partition is a relatively high-frequency operation, but a considerable part of business uses latest parameters. If the consumer client perceives the expansion lagging behind the producer client, data will be definitely lost. As a storage middleware, losing data must be a serious problem. *Although this problem can be avoided by config earliest, but it is not elegant, and the company uses clients in many other languages, such as rdkafka,go,python, etc. We expect to be transparent to the client without losing data, and if the amount of topic data is large. "earliest" may also put some pressure on the kafka servers, so we want to optimize the server logic to nearly completely solve this case.* Looking forward to your reply! > Consumer group may lose data for newly expanded partitions when add > partitions for topic if the group is set to consume from the latest > --- > > Key: KAFKA-12478 > URL: https://issues.apache.org/jira/browse/KAFKA-12478 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 2.7.0 >Reporter: hudeqi >Priority: Blocker > Labels: patch > Original Estimate: 1,158h > Remaining Estimate: 1,158h > > This problem is exposed in our product environment: a topic is used to > produce monitoring data. *After expanding partitions, the consumer side of > the business reported that the data is lost.* > After preliminary investigation, the lost data is all concentrated in the > newly expanded partitions. The reason is: when the server expands, the > producer firstly perceives the expansion, and some data is written in the > newly expanded partitions. But the consumer group perceives the expansion > later, after the rebalance is completed, the newly expanded partitions will > be consumed from the latest if it is set to consume from the latest. Within a > period of time, the data of the newly expanded partitions is skipped and lost > by the consumer. > If it is not necessarily set to consume from the earliest for a huge data > flow topic when starts up, this will make the group consume historical data > from the broker crazily, which will affect the performance of brokers to a > certain extent. Therefore, *it is necessary to consume these partitions from > the earliest separately.* -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10697) Remove ProduceResponse.responses
[ https://issues.apache.org/jira/browse/KAFKA-10697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-10697. Fix Version/s: 3.0.0 Resolution: Fixed > Remove ProduceResponse.responses > > > Key: KAFKA-10697 > URL: https://issues.apache.org/jira/browse/KAFKA-10697 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chun-Hao Tang >Priority: Minor > Fix For: 3.0.0 > > > This is a follow-up of KAFKA-9628. > related discussion: > https://github.com/apache/kafka/pull/9401#discussion_r518984349 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 merged pull request #10332: KAFKA-10697: Remove ProduceResponse.responses
chia7712 merged pull request #10332: URL: https://github.com/apache/kafka/pull/10332 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12507) java.lang.OutOfMemoryError: Direct buffer memory
diehu created KAFKA-12507: - Summary: java.lang.OutOfMemoryError: Direct buffer memory Key: KAFKA-12507 URL: https://issues.apache.org/jira/browse/KAFKA-12507 Project: Kafka Issue Type: Bug Components: core Environment: kafka version: 2.0.1 java version: java version "1.8.0_211" Java(TM) SE Runtime Environment (build 1.8.0_211-b12) Java HotSpot(TM) 64-Bit Server VM (build 25.211-b12, mixed mode) the command we use to start kafka broker: java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -Djava.awt.headless=true -XX:+ExplicitGCInvokesConcurrent Reporter: diehu Hi, we have three brokers in our kafka cluster, and we use scripts to send data to kafka at a rate of about 3.6w eps. After about one month, we got the OOM error: {code:java} [2021-01-09 17:12:24,750] ERROR Processor got uncaught exception. (kafka.network.Processor) java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Bits.java:694) at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) at sun.nio.ch.IOUtil.read(IOUtil.java:195) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) at org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:104) at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117) at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:335) at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:296) at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:562) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:498) at org.apache.kafka.common.network.Selector.poll(Selector.java:427) at kafka.network.Processor.poll(SocketServer.scala:679) at kafka.network.Processor.run(SocketServer.scala:584) at java.lang.Thread.run(Thread.java:748){code} the kafka server is not shutdown, but always get this error. And at the same time, data can not be produced to kafka cluster, consumer can not consume data from kafka cluster. We used the recommended java parameter XX:+ExplicitGCInvokesConcurrent but it seems not useful. Only kafka cluster restart helps to fix this problem. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12385) Remove FetchResponse#responseData
[ https://issues.apache.org/jira/browse/KAFKA-12385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304553#comment-17304553 ] Chia-Ping Tsai commented on KAFKA-12385: This issue is blocked as we are introducing topic id :( > Remove FetchResponse#responseData > - > > Key: KAFKA-12385 > URL: https://issues.apache.org/jira/browse/KAFKA-12385 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > reference to [https://github.com/apache/kafka/pull/9758#discussion_r584142074] > We can rewrite related code to avoid using stale data structure. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wcarlson5 commented on pull request #10356: KAFKA-12503: cache size set by the thread
wcarlson5 commented on pull request #10356: URL: https://github.com/apache/kafka/pull/10356#issuecomment-802427967 I will don't know about testing this via an integration test. I worry about introducing just another flaky test, these types of issue get caught mostly in soak. Even with the optimal conditions for it to go wrong it took a couple days. I will think about how to improve the tests but that will happen in a different PR for sure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12506) Expand AdjustStreamThreadCountTest
[ https://issues.apache.org/jira/browse/KAFKA-12506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304551#comment-17304551 ] A. Sophie Blee-Goldman commented on KAFKA-12506: cc [~wcarlson5] -- the first two items are probably a prerequisite for the test I described [here|https://github.com/apache/kafka/pull/10356#pullrequestreview-615974924], I'll try and remember to cross them off this list when we have something like that > Expand AdjustStreamThreadCountTest > -- > > Key: KAFKA-12506 > URL: https://issues.apache.org/jira/browse/KAFKA-12506 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: A. Sophie Blee-Goldman >Priority: Major > Labels: newbie, newbie++ > > Right now the AdjustStreamThreadCountTest runs a minimal topology that just > consumes a single input topic, and doesn't produce any data to this topic. > Some of the complex concurrency bugs that we've found only showed up when we > had some actual data to process and a stateful topology: > [KAFKA-12503|https://issues.apache.org/jira/browse/KAFKA-12503] > We should try to expand the Streams application in this test to really flex > the feature. Imo this would include > # Produce some data to the input topic > # One or more stateful operators in the topology, with caching enabled > # A repartition to get multiple subtopologies > # Write out some results to an output topic > # Verify that these results show up in the output -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12506) Expand AdjustStreamThreadCountTest
[ https://issues.apache.org/jira/browse/KAFKA-12506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-12506: --- Description: Right now the AdjustStreamThreadCountTest runs a minimal topology that just consumes a single input topic, and doesn't produce any data to this topic. Some of the complex concurrency bugs that we've found only showed up when we had some actual data to process and a stateful topology: [KAFKA-12503|https://issues.apache.org/jira/browse/KAFKA-12503] We should try to expand the Streams application in this test to really flex the feature. Imo this would include # Produce some data to the input topic # One or more stateful operators in the topology, with caching enabled # A repartition to get multiple subtopologies # Write out some results to an output topic # Verify that these results show up in the output was: Right now the AdjustStreamThreadCountTest runs a minimal topology that just consumes a single input topic, and doesn't produce any data to this topic. Some of the complex concurrency bugs that we've found only showed up when we had some actual data to process and a stateful topology: [KAFKA-12503|https://issues.apache.org/jira/browse/KAFKA-12503] We should try to expand the Streams application in this test to really flex the feature. Imo this would include # One or more stateful operators in the topology, with caching enabled # A repartition to get multiple subtopologies # Writing to an output topic # Produce some data to the input topic and verify that some results show up in the output > Expand AdjustStreamThreadCountTest > -- > > Key: KAFKA-12506 > URL: https://issues.apache.org/jira/browse/KAFKA-12506 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: A. Sophie Blee-Goldman >Priority: Major > Labels: newbie, newbie++ > > Right now the AdjustStreamThreadCountTest runs a minimal topology that just > consumes a single input topic, and doesn't produce any data to this topic. > Some of the complex concurrency bugs that we've found only showed up when we > had some actual data to process and a stateful topology: > [KAFKA-12503|https://issues.apache.org/jira/browse/KAFKA-12503] > We should try to expand the Streams application in this test to really flex > the feature. Imo this would include > # Produce some data to the input topic > # One or more stateful operators in the topology, with caching enabled > # A repartition to get multiple subtopologies > # Write out some results to an output topic > # Verify that these results show up in the output -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12506) Expand AdjustStreamThreadCountTest
A. Sophie Blee-Goldman created KAFKA-12506: -- Summary: Expand AdjustStreamThreadCountTest Key: KAFKA-12506 URL: https://issues.apache.org/jira/browse/KAFKA-12506 Project: Kafka Issue Type: Improvement Components: streams, unit tests Reporter: A. Sophie Blee-Goldman Right now the AdjustStreamThreadCountTest runs a minimal topology that just consumes a single input topic, and doesn't produce any data to this topic. Some of the complex concurrency bugs that we've found only showed up when we had some actual data to process and a stateful topology: [KAFKA-12503|https://issues.apache.org/jira/browse/KAFKA-12503] We should try to expand the Streams application in this test to really flex the feature. Imo this would include # One or more stateful operators in the topology, with caching enabled # A repartition to get multiple subtopologies # Writing to an output topic # Produce some data to the input topic and verify that some results show up in the output -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12506) Expand AdjustStreamThreadCountTest
[ https://issues.apache.org/jira/browse/KAFKA-12506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-12506: --- Labels: newbie newbie++ (was: ) > Expand AdjustStreamThreadCountTest > -- > > Key: KAFKA-12506 > URL: https://issues.apache.org/jira/browse/KAFKA-12506 > Project: Kafka > Issue Type: Improvement > Components: streams, unit tests >Reporter: A. Sophie Blee-Goldman >Priority: Major > Labels: newbie, newbie++ > > Right now the AdjustStreamThreadCountTest runs a minimal topology that just > consumes a single input topic, and doesn't produce any data to this topic. > Some of the complex concurrency bugs that we've found only showed up when we > had some actual data to process and a stateful topology: > [KAFKA-12503|https://issues.apache.org/jira/browse/KAFKA-12503] > We should try to expand the Streams application in this test to really flex > the feature. Imo this would include > # One or more stateful operators in the topology, with caching enabled > # A repartition to get multiple subtopologies > # Writing to an output topic > # Produce some data to the input topic and verify that some results show up > in the output -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on pull request #10357: MINOR: KIP-500 Cluster ID is a String
hachikuji commented on pull request #10357: URL: https://github.com/apache/kafka/pull/10357#issuecomment-802417977 @vvcephei Where are we with the RC? Do we still have time to merge this to 2.8? We can do this after the release if necessary, but we'd prefer not to break the protocol if we don't have to. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12505) Should kafka-storage.sh accept a non-UUID for its --cluster-id parameter?
[ https://issues.apache.org/jira/browse/KAFKA-12505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-12505: Labels: kip-500 (was: ) > Should kafka-storage.sh accept a non-UUID for its --cluster-id parameter? > - > > Key: KAFKA-12505 > URL: https://issues.apache.org/jira/browse/KAFKA-12505 > Project: Kafka > Issue Type: New Feature >Reporter: Ron Dagostino >Priority: Minor > Labels: kip-500 > > Should StorageTool support accepting non-UUIDs via its --cluster-id argument? > One purpose of the tool is to minimize the chance that a broker could use > data from the wrong volume (i.e. data from another cluster). Generating a > random UUID via the --random-uuid parameter encourages using a globally > unique value for every cluster and is consistent with the behavior today with > ZooKeeper, whereas allowing a non-UUID here would increase the chance that > someone could reuse a Cluster ID value across clusters and short-circuit the > risk mitigation that this tool provides. > Discuss... -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #10356: KAFKA-12503: cache size set by the thread
ableegoldman commented on pull request #10356: URL: https://github.com/apache/kafka/pull/10356#issuecomment-802411941 One unrelated test failure: `kafka.api.SaslGssapiSslEndToEndAuthorizationTest.testNoConsumeWithDescribeAclViaAssign` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #10357: MINOR: KIP-500 Cluster ID is a String
rondagostino commented on a change in pull request #10357: URL: https://github.com/apache/kafka/pull/10357#discussion_r597326794 ## File path: core/src/main/scala/kafka/tools/StorageTool.scala ## @@ -198,7 +198,7 @@ object StorageTool extends Logging { s"does not appear to be a valid UUID: ${e.getMessage}") } require(config.nodeId >= 0, s"The node.id must be set to a non-negative integer.") -new MetaProperties(effectiveClusterId, config.nodeId) +new MetaProperties(effectiveClusterId.toString, config.nodeId) Review comment: > file a jira so we don't forget about it. https://issues.apache.org/jira/browse/KAFKA-12505 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12505) Should kafka-storage.sh accept a non-UUID for its --cluster-id parameter?
Ron Dagostino created KAFKA-12505: - Summary: Should kafka-storage.sh accept a non-UUID for its --cluster-id parameter? Key: KAFKA-12505 URL: https://issues.apache.org/jira/browse/KAFKA-12505 Project: Kafka Issue Type: New Feature Reporter: Ron Dagostino Should StorageTool support accepting non-UUIDs via its --cluster-id argument? One purpose of the tool is to minimize the chance that a broker could use data from the wrong volume (i.e. data from another cluster). Generating a random UUID via the --random-uuid parameter encourages using a globally unique value for every cluster and is consistent with the behavior today with ZooKeeper, whereas allowing a non-UUID here would increase the chance that someone could reuse a Cluster ID value across clusters and short-circuit the risk mitigation that this tool provides. Discuss... -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12499) Adjust transaction timeout according to commit interval on Streams EOS
[ https://issues.apache.org/jira/browse/KAFKA-12499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304537#comment-17304537 ] A. Sophie Blee-Goldman commented on KAFKA-12499: Nice catch, but just wondering why choose 10x the commit interval? And should we set a lower bound as well so we fall back on the default of 1 minute if 10 * commit_interval is below that? For example with EOS that would mean a transaction timeout of just one second -- we only check whether it's time to commit after a loop of processing all tasks, if there are a large number of tasks or some heavy processing we'd just be constantly timing out. > Adjust transaction timeout according to commit interval on Streams EOS > -- > > Key: KAFKA-12499 > URL: https://issues.apache.org/jira/browse/KAFKA-12499 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Fix For: 3.0.0 > > > The transaction timeout is set to 1 minute by default on Producer today, > while the commit interval on the other hand could be set to a very large > value, which makes the stream always hit transaction timeout and drop into > rebalance. We should increase the transaction timeout correspondingly when > commit interval is large. > On the other hand, broker could have a limit on the max transaction timeout > to be set. If we scale up client transaction timeout over the limit, stream > will fail due to INVALID_TRANSACTION_TIMEOUT. To alleviate this problem, > user could define their own customized transaction timeout to avoid hitting > the limit, so we should still respect what user configures in the override. > The new rule for configuring transaction timeout should look like: > 1. If transaction timeout is set in streams config, use it > 2. if not, transaction_timeout = max(default_transaction_timeout, 10 * > commit_interval) > Additionally if INVALID_TRANSACTION_TIMEOUT was thrown on Streams when > calling initTransaction(), we should wrap the exception to inform user that > their setting for commit interval could potentially be too high and should > adjust. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #10357: MINOR: KIP-500 Cluster ID is a String
hachikuji commented on a change in pull request #10357: URL: https://github.com/apache/kafka/pull/10357#discussion_r597325275 ## File path: core/src/main/scala/kafka/tools/StorageTool.scala ## @@ -198,7 +198,7 @@ object StorageTool extends Logging { s"does not appear to be a valid UUID: ${e.getMessage}") } require(config.nodeId >= 0, s"The node.id must be set to a non-negative integer.") -new MetaProperties(effectiveClusterId, config.nodeId) +new MetaProperties(effectiveClusterId.toString, config.nodeId) Review comment: I'm ok with doing this later. Let's be sure to file a jira so we don't forget about it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #10357: MINOR: KIP-500 Cluster ID is a String
rondagostino commented on a change in pull request #10357: URL: https://github.com/apache/kafka/pull/10357#discussion_r597324486 ## File path: core/src/main/scala/kafka/tools/StorageTool.scala ## @@ -198,7 +198,7 @@ object StorageTool extends Logging { s"does not appear to be a valid UUID: ${e.getMessage}") } require(config.nodeId >= 0, s"The node.id must be set to a non-negative integer.") -new MetaProperties(effectiveClusterId, config.nodeId) +new MetaProperties(effectiveClusterId.toString, config.nodeId) Review comment: > Should we also remove the [StorageTool] logic above which requires that the clusterId parses as a UUID Basically, this is asking if `StorageTool` should support accepting non-UUIDs via its `--cluster-id` argument. One purpose of the tool is to minimize the chance that a broker could use data from the wrong volume (i.e. data from another cluster). Generating a random UUID via the `--random-uuid` parameter encourages using a globally unique value for every cluster and is consistent with the behavior today with ZooKeeper, whereas allowing a non-UUID here would increase the chance that someone could reuse a Cluster ID value across clusters and short-circuit the risk mitigation that this tool provides. Merging this PR now avoids an uncomfortable "rename" in `BrokerRegistrationRequest` (or a breaking change) after 2.8 is released; letting `StorageTool` accept non-UUIDs can be done at any time with no impact. Maybe we keep this PR about removing the constraint, which also simply cannot exist -- otherwise we can't upgrade clusters that don't use a UUID -- and leave this question about the `--cluster-id` parameter for a broader discussion later? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 commented on pull request #10356: KAFKA-12503: cache size set by the thread
wcarlson5 commented on pull request #10356: URL: https://github.com/apache/kafka/pull/10356#issuecomment-802393840 @guozhangwang @ableegoldman Can you take a look? @vvcephei I think this needs to be picked to 2.8 as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12504) KafkaMetadataLog should check clean shutdown file
[ https://issues.apache.org/jira/browse/KAFKA-12504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-12504: Description: When a `Log` is opened, we will skip recovery if the clean shutdown file is present. This is specified as an optional parameter, which has a default value of `true`. Currently `KafkaMetadataLog` relies on this default instead of properly checking for clean shutdown, which means we will always skip recovery for this log. (was: When a `Log` is opened, we will skip recovery if the clean shutdown file is present. This is specified as an optional parameter, which has a default value of `true`. Currently `KafkaMetadataLog` relies on this default instead of properly checking for clean shutdown, which means we will always skip recovery for this og.) > KafkaMetadataLog should check clean shutdown file > - > > Key: KAFKA-12504 > URL: https://issues.apache.org/jira/browse/KAFKA-12504 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Labels: kip-500 > > When a `Log` is opened, we will skip recovery if the clean shutdown file is > present. This is specified as an optional parameter, which has a default > value of `true`. Currently `KafkaMetadataLog` relies on this default instead > of properly checking for clean shutdown, which means we will always skip > recovery for this log. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rohitrmd commented on a change in pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch
rohitrmd commented on a change in pull request #10276: URL: https://github.com/apache/kafka/pull/10276#discussion_r597309028 ## File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala ## @@ -485,6 +485,170 @@ final class KafkaMetadataLogTest { batchBuilder.build() } + @Test + def testValidateEpochGreaterThanLastKnownEpoch(): Unit = { +val log = buildMetadataLog(tempDir, mockTime) + +val numberOfRecords = 1 +val epoch = 1 + +append(log, numberOfRecords, epoch) + +val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 1) +assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind) +assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch()) + } + + @Test + def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = { +val log = buildMetadataLog(tempDir, mockTime) + +val numberOfRecords = 10 +val epoch = 1 + +append(log, numberOfRecords, epoch) +log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)) + +val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch) +TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot => + snapshot.freeze() +} +assertTrue(log.deleteBeforeSnapshot(snapshotId)) + +val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch - 1) +assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind) +assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), resultOffsetAndEpoch.offsetAndEpoch()) Review comment: @hachikuji made changes to reuse snapshotId variable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10563) Make sure task directories don't remain locked by dead threads
[ https://issues.apache.org/jira/browse/KAFKA-10563?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman updated KAFKA-10563: --- Fix Version/s: (was: 2.8.0) > Make sure task directories don't remain locked by dead threads > -- > > Key: KAFKA-10563 > URL: https://issues.apache.org/jira/browse/KAFKA-10563 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > > Most common/expected exceptions within Streams are handled gracefully, and > the thread will make sure to clean up all resources such as task locks during > shutdown. However, there are some instances where an unexpected exception > such as an IllegalStateException can leave some resources orphaned. > We have seen this happen to task directories after an IllegalStateException > is hit during the TaskManager's rebalance handling logic – the Thread shuts > down, but loses track of some tasks before unlocking them. This blocks any > further work on that task by any other thread in the same instance. > Previously we decided that this was "ok" because an IllegalStateException > means all bets are off. But with the upcoming work of KIP-663 and KIP-671, > users will be able to react smartly on dying threads and replace them with > new ones, making it more important than ever to ensure that the application > can continue on with no lasting repercussions of a thread death. If we allow > users to revive/replace a thread that dies due to IllegalStateException, that > thread should not be blocked from doing any work by the ghost of its > predecessor. > It might be easiest to just add some logic to the cleanup thread to verify > all the existing locks against the list of live threads, and remove any > zombie locks. But we probably want to do this purging more frequently than > the cleanup thread runs (10min by default) – so maybe we can leverage the > work in KIP-671 and have each thread purge any locks still owned by it after > the uncaught exception handler runs, but before the thread dies. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12503) Resizing the thread cache in a non thread safe way can cause records to be redirected throughout the topology
[ https://issues.apache.org/jira/browse/KAFKA-12503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-12503: -- Assignee: Walker Carlson > Resizing the thread cache in a non thread safe way can cause records to be > redirected throughout the topology > - > > Key: KAFKA-12503 > URL: https://issues.apache.org/jira/browse/KAFKA-12503 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Walker Carlson >Assignee: Walker Carlson >Priority: Blocker > Fix For: 2.8.0 > > > When a thread is added, removed or replaced the cache is resized. When the > thread cache was resized it was being done so from the thread initiating > these calls. This can cause the record to be redirected to the wrong > processor via the call to `evict` in the cache. The evict flushes records > downstream to the next processor after the cache. But if this is on the wrong > thread the wrong processor receives them. > This can cause 3 problems. > 1) When the owner finishes processing the record it set the current node to > null in the processor context a this then causes the other processor to throw > an exception `StreamsException: Current node is unknown.`. > 2) Depending on the type it can cause a class cast exception as the record is > a different type. Mostly this happened when the value types were different > inside of the map node from the toStream method > 3) A silent issue is it could cause data to be processed by the wrong node > and cause data corruption. We have not been able to confirm this last one but > it is the most dangerous in many ways. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-12463: -- Description: Kafka consumers have a pluggable [partition assignment interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] that comes with several out-of-the-box implementations including the [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], and [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. If no partition assignor is configured with a consumer, the {{RangeAssignor}} is used by default. Although there are some benefits to this assignor including stability of assignment across generations and simplicity of design, it comes with a major drawback: the number of active consumers in a group is limited to the number of partitions in the topic(s) with the most partitions. For an example of the worst case, in a consumer group where every member is subscribed to ten topics that each have one partition, only one member of that group will be assigned any topic partitions. This can end up producing counterintuitive and even frustrating behavior when a sink connector is brought up with N tasks to read from some collection of topics with a total of N topic partitions, but some tasks end up idling and not processing any data. h3. Proposed Change *NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below will not work as consumers will still perform eager rebalancing as long as at least one of the partition assignors they are configured with does not support cooperative rebalancing. KAFKA-12487 should also be addressed before configuring any Connect worker to use the {{CooperativeStickyAssignor}} for any sink connectors.* [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] introduced the {{CooperativeStickyAssignor}}, which seeks to provide a stable assignment across generations wherever possible, provide the most even assignment possible (taking into account possible differences in subscriptions across consumers in the group), and allow consumers to continue processing data during rebalance. The documentation for the assignor states that "Users should prefer this assignor for newer clusters." As Connect and the tooling around it matures and automatic restarts of failed tasks become more popular, care should be taken to ensure that the consumer group churn created by restarting one or more tasks doesn't compromise the availability of other tasks by forcing them to temporarily yield up all of their partitions just to reclaim them after a rebalance has completed. With that in mind, we should alter the default consumer configuration for sink tasks to use the new {{CooperativeStickyAssignor}}. In order to do this in a backwards-compatible fashion that also enables rolling upgrades, this should be implemented by changing the {{Worker}} to set the following on the consumer configuration created for each sink connector task: {code:java} partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor {code} This way, consumer groups for sink connectors on Connect clusters in the process of being upgraded will continue to use the {{RangeAssignor}} until all workers in the cluster have been upgraded, and then will switch over to the new {{CooperativeStickyAssignor}} automatically. Importantly, this setting will only be a default, and any user-specified overrides either in the *worker config*: {code:java} consumer.partition.assignment.strategy={code} or in the *connector config*: {code:java} "consumer.override.partition.assignment.strategy": ""{code} will still be respected. This improvement is viable as far back as -2.3- 2.4, when the {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug fix, should only be applied to the Connect framework in an upcoming minor release. h3. Workaround: manually setting the partition assignment strategy There is a simple workaround to achieve the same behavior in releases 2.4 and later that don't include this improvement: manually override either a connector configuration or an entire worker configuration. In order to avoid task failures while the connector is being reconfigured, it is highly recommended that the consumer be configured with a list of both the new and the current partition assignment strategies, instead of just the new
[jira] [Created] (KAFKA-12504) KafkaMetadataLog should check clean shutdown file
Jason Gustafson created KAFKA-12504: --- Summary: KafkaMetadataLog should check clean shutdown file Key: KAFKA-12504 URL: https://issues.apache.org/jira/browse/KAFKA-12504 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson When a `Log` is opened, we will skip recovery if the clean shutdown file is present. This is specified as an optional parameter, which has a default value of `true`. Currently `KafkaMetadataLog` relies on this default instead of properly checking for clean shutdown, which means we will always skip recovery for this og. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-12463: -- Description: Kafka consumers have a pluggable [partition assignment interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] that comes with several out-of-the-box implementations including the [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], and [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. If no partition assignor is configured with a consumer, the {{RangeAssignor}} is used by default. Although there are some benefits to this assignor including stability of assignment across generations and simplicity of design, it comes with a major drawback: the number of active consumers in a group is limited to the number of partitions in the topic(s) with the most partitions. For an example of the worst case, in a consumer group where every member is subscribed to ten topics that each have one partition, only one member of that group will be assigned any topic partitions. This can end up producing counterintuitive and even frustrating behavior when a sink connector is brought up with N tasks to read from some collection of topics with a total of N topic partitions, but some tasks end up idling and not processing any data. h3. Proposed Change *NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below will not work as consumers will still perform eager rebalancing as long as at least one of the partition assignors they are configured with does not support cooperative rebalancing. KAFKA-12487 should also be addressed before configuring any Connect worker to use the {{CooperativeStickyAssignor}} for any sink connectors.* [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] introduced the {{CooperativeStickyAssignor}}, which seeks to provide a stable assignment across generations wherever possible, provide the most even assignment possible (taking into account possible differences in subscriptions across consumers in the group), and allow consumers to continue processing data during rebalance. The documentation for the assignor states that "Users should prefer this assignor for newer clusters." We should alter the default consumer configuration for sink tasks to use the new {{CooperativeStickyAssignor}}. In order to do this in a backwards-compatible fashion that also enables rolling upgrades, this should be implemented by changing the {{Worker}} to set the following on the consumer configuration created for each sink connector task: {code:java} partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor {code} This way, consumer groups for sink connectors on Connect clusters in the process of being upgraded will continue to use the {{RangeAssignor}} until all workers in the cluster have been upgraded, and then will switch over to the new {{CooperativeStickyAssignor}} automatically. But, this setting will be overwritten by any user-specified {{consumer.partition.assignment.strategy}} property in the worker configuration, and by any user-specified {{consumer.override.partition.assignment.strategy}} property in a sink connector configuration when per-connector client overrides is enabled in the worker config with {{connector.client.config.override.policy=ALL}}. This improvement is viable as far back as -2.3- 2.4, when the {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug fix, should only be applied to the Connect framework in an upcoming minor release. h3. Manually setting the partition assignment strategy There is a simple workaround to achieve the same behavior in releases 2.4 and later that don't include this improvement: manually override either a connector configuration or an entire worker configuration. In order to avoid task failures while the connector is being reconfigured, it is highly recommended that the consumer be configured with a list of both the new and the current partition assignment strategies, instead of just the new partition assignment strategy. For example, to update a connector formerly configured to use the {{RangeAssignor}} strategy to instead use the {{RoundRobinAssignor}} strategy, add the following to the connector configuration: {code:java} "consumer.override.partition.assignment.strategy":
[jira] [Commented] (KAFKA-12503) Resizing the thread cache in a non thread safe way can cause records to be redirected throughout the topology
[ https://issues.apache.org/jira/browse/KAFKA-12503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304523#comment-17304523 ] Guozhang Wang commented on KAFKA-12503: --- Thanks for the summary [~wcarlson5] [~ableegoldman]. I'm wondering if we should fix it along with https://issues.apache.org/jira/browse/KAFKA-12500 together by adding a separate function in the cache to just free the space corresponding to a thread that does not trigger eviction --- i.e. just clear the records in the buffer. Wondering what's your current proposal to fix. > Resizing the thread cache in a non thread safe way can cause records to be > redirected throughout the topology > - > > Key: KAFKA-12503 > URL: https://issues.apache.org/jira/browse/KAFKA-12503 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Walker Carlson >Priority: Blocker > Fix For: 2.8.0 > > > When a thread is added, removed or replaced the cache is resized. When the > thread cache was resized it was being done so from the thread initiating > these calls. This can cause the record to be redirected to the wrong > processor via the call to `evict` in the cache. The evict flushes records > downstream to the next processor after the cache. But if this is on the wrong > thread the wrong processor receives them. > This can cause 3 problems. > 1) When the owner finishes processing the record it set the current node to > null in the processor context a this then causes the other processor to throw > an exception `StreamsException: Current node is unknown.`. > 2) Depending on the type it can cause a class cast exception as the record is > a different type. Mostly this happened when the value types were different > inside of the map node from the toStream method > 3) A silent issue is it could cause data to be processed by the wrong node > and cause data corruption. We have not been able to confirm this last one but > it is the most dangerous in many ways. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304519#comment-17304519 ] Chris Egerton commented on KAFKA-12463: --- Thanks Randall-- {quote}I agree with you that we should fix the behavior. But the fix will appear only in certain releases, and not all users will be able to upgrade to those releases to get the fix. So, documenting the simple workaround will help users in those situations. I suggested documenting the workaround here to help any users that do stumble upon this issue when searching for a potential fix, regardless of whether the workaround is also documented elsewhere. {quote} That's an interesting take... I guess the difference I perceive here is that this is a proposed improvement, not really a fix. I don't know if any users are going to stumble onto this after something breaks since it doesn't address anything breaking to begin with. Either way, I guess we can try to iron out the workaround a little more here (will do now) but hopefully we can put something in the public-facing docs for Connect (maybe as part of upgrade notes if we change the default consumer partition assignment strategy) in addition to that; seems like that might get more of the target audience here. With regards to steps forward and your question, [~ableegoldman], I wasn't certain one way or the other about round robin vs cooperative sticky assignment. I had a few thoughts: * When the set of task configurations changes, the advantages of stickiness or the cooperative protocol are basically irrelevant since each task and its accompanying consumer is brought down and a new one is brought up in its place. * In a completely stable world where consumer assignment never changes, round robin would be ideal (out of the ones available right now) as it'd guarantee as-even-as-possible spread of partitions within the same topic across tasks. * Otherwise, the only times the cooperative protocol might come into play are: ** When a consumer subscription is updated (which would cause a consumer rebalance but keep all sink tasks running) ** When a task is started or shut down without the connector being reconfigured (which may happen when a single task fails, a failed task is restarted, a new worker joins the group, or an existing worker leaves the group) ** When the consumer for a task falls out of the group (likely because the task is taking too long to process data provided to it by the framework). * Under most of these scenarios, stickiness would provide no benefit as any time a consumer is created, a new task is brought up in its place. The only exception is an update to a consumer subscription, but even that would require some changes to how Connect invokes [SinkTask::open|https://kafka.apache.org/27/javadoc/org/apache/kafka/connect/sink/SinkTask.html#open-java.util.Collection-] and [SinkTask::close|https://kafka.apache.org/27/javadoc/org/apache/kafka/connect/sink/SinkTask.html#close-java.util.Collection-] to basically fake cooperative rebalancing in the way that's proposed in the [StickyAssignor docs|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html] under the "Impact on {{ConsumerRebalanceListener}}" section. The benefits of the sticky assignor seem pretty slim, and would require some one-off tooling from Connect to basically re-implement the cooperative protocol. This is why I'm personally not in favor of it, but would love to learn more if there's something I'm missing here. So with all that in mind, we can transform the question into whether it's important to favor any of the scenarios outlined above where cooperative rebalancing might be of some benefit, and if not, opt to use the round robin assignor. There's one that comes to mind that I think might be worth considering, and count in favor of using the cooperative sticky assignor: if there's any kind of tooling in place that restarts failed tasks automatically, there will be significant consumer churn as consumers may rapidly fall out of and join the group. I think this scenario is going to become more and more common as adoption of Connect increases and both it and the tooling around it mature, and as a result, I'm gently in favor of trying to use the {{CooperativeStickyAssignor}} now, or at least, when it becomes possible in the Connect framework once work on KAFKA-12477 and KAFKA-12487 completes. I raised this point on the PR but it bears repeating here: we might want to reason through this carefully since we only get one shot to do this kind of automated upgrade before the next one gets more complicated (the list of assignors will grow from one to two this time around; it'll either have to grow from two to three the next, or we'll have to risk breaking changes for users who skip an upgrade step). I raised the round robin assignor as an option
[jira] [Commented] (KAFKA-12503) Resizing the thread cache in a non thread safe way can cause records to be redirected throughout the topology
[ https://issues.apache.org/jira/browse/KAFKA-12503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304518#comment-17304518 ] A. Sophie Blee-Goldman commented on KAFKA-12503: Thanks for the writeup! If anyone reading this wants to check out the code for themselves, the methods and classes of note are (1) the ProcessorContext, which tracks a "current node" and forwards records to that note as they flow through the topology, (2) StreamTask#process where a StreamThread picks up the next record, sets the current node on the context to the SourceNode, processes the record through the subtopology, and then sets the current node back to null, and (3) TimestampedCacheFlushListener which is tied to a cache and the processor node immediately downstream of that cache. The listener receives records that have been evicted, saves the current node on the context as the `prevNode`, then sets the current node on the context, forwards the record, and resets the current node to `prevNode`. The `StreamsException: Current node is unknown` must have occurred because the StreamThread owning this task had finished processing the record and set the current node to null when another thread hit the injected exception and tried to resize the cache. When the evicted record ended up in TimestampedCacheFlushListener#apply, the listener saved "null" as it's previous node, processed the record with its own node, and then reset it to null. If the other StreamThread had begun processing this task in the meantime, it would suddenly find its current node to be null and throw the exception we see. The ClassCastException is similar, but instead of the listener setting the current node to null while the StreamTask was in the middle of processing, it must have set the current node to some other node elsewhere in the topology. If this other node has different input/output types, we would run into a ClassCastException as it forwards eg a Long to the downstream node which was expecting a Change. Of course these are the race conditions under which we ran into visible symptoms, but as Walker mentioned the more dangerous possibility is all the other times when a foreign processor is inserted into the middle of the subtopology but the data types match and therefore no exception is thrown. In these cases we would be silently corrupting the data. Obviously one followup question we should ask is why it was so easy for one thread to process records that belong to another in the first place? We should consider some kind of safety mechanism to ensure single-threaded access to the ProcessorContext and task directories -- turns out the locking mechanism doesn't actually protect against multithreaded access unless we explicitly ask it whether we can lock it or not. > Resizing the thread cache in a non thread safe way can cause records to be > redirected throughout the topology > - > > Key: KAFKA-12503 > URL: https://issues.apache.org/jira/browse/KAFKA-12503 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Walker Carlson >Priority: Blocker > Fix For: 2.8.0 > > > When a thread is added, removed or replaced the cache is resized. When the > thread cache was resized it was being done so from the thread initiating > these calls. This can cause the record to be redirected to the wrong > processor via the call to `evict` in the cache. The evict flushes records > downstream to the next processor after the cache. But if this is on the wrong > thread the wrong processor receives them. > This can cause 3 problems. > 1) When the owner finishes processing the record it set the current node to > null in the processor context a this then causes the other processor to throw > an exception `StreamsException: Current node is unknown.`. > 2) Depending on the type it can cause a class cast exception as the record is > a different type. Mostly this happened when the value types were different > inside of the map node from the toStream method > 3) A silent issue is it could cause data to be processed by the wrong node > and cause data corruption. We have not been able to confirm this last one but > it is the most dangerous in many ways. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12503) Resizing the thread cache in a non thread safe way can cause records to be redirected throughout the topology
Walker Carlson created KAFKA-12503: -- Summary: Resizing the thread cache in a non thread safe way can cause records to be redirected throughout the topology Key: KAFKA-12503 URL: https://issues.apache.org/jira/browse/KAFKA-12503 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.8.0 Reporter: Walker Carlson Fix For: 2.8.0 When a thread is added, removed or replaced the cache is resized. When the thread cache was resized it was being done so from the thread initiating these calls. This can cause the record to be redirected to the wrong processor via the call to `evict` in the cache. The evict flushes records downstream to the next processor after the cache. But if this is on the wrong thread the wrong processor receives them. This can cause 3 problems. 1) When the owner finishes processing the record it set the current node to null in the processor context a this then causes the other processor to throw an exception `StreamsException: Current node is unknown.`. 2) Depending on the type it can cause a class cast exception as the record is a different type. Mostly this happened when the value types were different inside of the map node from the toStream method 3) A silent issue is it could cause data to be processed by the wrong node and cause data corruption. We have not been able to confirm this last one but it is the most dangerous in many ways. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #10357: MINOR: KIP-500 Cluster ID is a String
hachikuji commented on a change in pull request #10357: URL: https://github.com/apache/kafka/pull/10357#discussion_r597285632 ## File path: core/src/main/scala/kafka/server/BrokerMetadataCheckpoint.scala ## @@ -106,14 +105,8 @@ object MetaProperties { value.getOrElse(throw new RuntimeException(s"Failed to find required property $key.")) } - def requireClusterId(properties: RawMetaProperties): Uuid = { -val value = require(ClusterIdKey, properties.clusterId) -try { - Uuid.fromString(value) -} catch { - case e: Throwable => throw new RuntimeException(s"Failed to parse $ClusterIdKey property " + -s"as a UUID: ${e.getMessage}") -} + def requireClusterId(properties: RawMetaProperties): String = { Review comment: We may as well get rid of this method. We are already using `require(NodeIdKey, properties.nodeId)` above. ## File path: core/src/test/scala/kafka/server/BrokerMetadataCheckpointTest.scala ## @@ -81,21 +80,21 @@ class BrokerMetadataCheckpointTest { } @Test - def testMetaPropertiesDoesNotAllowHexEncodedUUIDs(): Unit = { + def testMetaPropertiesAllowsHexEncodedUUIDs(): Unit = { val properties = new RawMetaProperties() properties.version = 1 properties.clusterId = "7bc79ca1-9746-42a3-a35a-efb3cde44492" properties.nodeId = 1 -assertThrows(classOf[RuntimeException], () => MetaProperties.parse(properties)) +MetaProperties.parse(properties) Review comment: Can we assert something about the return value? For example, we could follow the pattern in `testCreateMetadataProperties`. ## File path: core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala ## @@ -32,7 +32,7 @@ class KafkaRaftServerTest { @Test def testSuccessfulLoadMetaProperties(): Unit = { -val clusterId = Uuid.randomUuid() +val clusterId = Uuid.randomUuid().toString Review comment: Probably doesn't matter too much, but perhaps we could hard-code all of these to some string such as "JgxuGe9URy-E-ceaL04lEw" from the other test. The way it's written after this change looks a little odd. ## File path: core/src/main/scala/kafka/tools/StorageTool.scala ## @@ -198,7 +198,7 @@ object StorageTool extends Logging { s"does not appear to be a valid UUID: ${e.getMessage}") } require(config.nodeId >= 0, s"The node.id must be set to a non-negative integer.") -new MetaProperties(effectiveClusterId, config.nodeId) +new MetaProperties(effectiveClusterId.toString, config.nodeId) Review comment: Should we also remove the logic above which requires that the clusterId parses as a UUID? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10342: KAFKA-12288: remove task-level filesystem locks
ableegoldman commented on pull request #10342: URL: https://github.com/apache/kafka/pull/10342#issuecomment-802350667 @wcarlson5 that's a good question, and I totally agree we should be more careful about that. In theory we should always clean up all state including unlock task directories during a thread's shutdown, but it's possible something slips through and a task directory gets orphaned. I filed a ticket to address this a while back: see [KAFKA-10563](https://issues.apache.org/jira/browse/KAFKA-10563) That said, I think it's somewhat of an orthogonal concern and would prefer to do that in a separate PR to keep this one light. I don't think the changes here make that any better or worse -- if a dying thread orphaned some tasks, we weren't cleaning up the filesystem locks either. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks
ableegoldman commented on a change in pull request #10342: URL: https://github.com/apache/kafka/pull/10342#discussion_r597281724 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -477,15 +441,7 @@ private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) { exception ); } finally { -try { -unlock(id); -} catch (final IOException exception) { -log.warn( -String.format("%s Swallowed the following exception during unlocking after deletion of obsolete " + -"state directory %s for task %s:", logPrefix(), dirName, id), -exception -); -} +unlock(id); Review comment: I think we need to keep that one because `Utils.delete` can throw IOException too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on pull request #10357: MINOR: KIP-500 Cluster ID is a String
rondagostino commented on pull request #10357: URL: https://github.com/apache/kafka/pull/10357#issuecomment-802344461 This needs to be cherry-picked to 2.8. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino opened a new pull request #10357: MINOR: KIP-500 Cluster ID is a String
rondagostino opened a new pull request #10357: URL: https://github.com/apache/kafka/pull/10357 It is possible that an existing Cluster ID in Zookeeper is not convertible to a UUID. KIP-500 cannot constrain Cluster IDs to be convertible to a UUID and still expect such a cluster to be able to upgrade to KIP-500. This patch removes this UUID constraint and makes KIP-500 consistent with existing RPCs, which treat Cluster ID as a String. It is unlikely that an existing Cluster ID in Zookeeper would not be convertible to a UUID because the Cluster ID is auto-generated in the form of a UUID and stored in ZooKeeper as soon as the first broker connects. However, there is nothing preventing someone from generating a Cluster ID that does not confirm to the UUID format and manually storing/bootstrapping that into ZooKeeper before the first broker connects. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10289: KAFKA-12440: ClusterId validation for Vote, BeginQuorum, EndQuorum and FetchSnapshot
hachikuji commented on a change in pull request #10289: URL: https://github.com/apache/kafka/pull/10289#discussion_r597278840 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -814,6 +822,10 @@ private EndQuorumEpochResponseData handleEndQuorumEpochRequest( ) throws IOException { EndQuorumEpochRequestData request = (EndQuorumEpochRequestData) requestMetadata.data; +if (!hasValidClusterId(request)) { Review comment: I think a better way to do this is to modify `validateVoterOnlyRequest` and `validateLeaderOnlyRequest` so that we pass the clusterId. Then we can get rid of `getClusterId`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks
ableegoldman commented on a change in pull request #10342: URL: https://github.com/apache/kafka/pull/10342#discussion_r597278900 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -288,50 +277,31 @@ private String logPrefix() { * Get the lock for the {@link TaskId}s directory if it is available * @param taskId task id * @return true if successful - * @throws IOException if the file cannot be created or file handle cannot be grabbed, should be considered as fatal */ -synchronized boolean lock(final TaskId taskId) throws IOException { +synchronized boolean lock(final TaskId taskId) { if (!hasPersistentStores) { return true; } -final File lockFile; -// we already have the lock so bail out here -final LockAndOwner lockAndOwner = locks.get(taskId); -if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread().getName())) { -log.trace("{} Found cached state dir lock for task {}", logPrefix(), taskId); -return true; -} else if (lockAndOwner != null) { -// another thread owns the lock -return false; -} - -try { -lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME); -} catch (final ProcessorStateException e) { -// directoryForTask could be throwing an exception if another thread -// has concurrently deleted the directory -return false; -} - -final FileChannel channel; - -try { -channel = getOrCreateFileChannel(taskId, lockFile.toPath()); -} catch (final NoSuchFileException e) { -// FileChannel.open(..) could throw NoSuchFileException when there is another thread -// concurrently deleting the parent directory (i.e. the directory of the taskId) of the lock -// file, in this case we will return immediately indicating locking failed. +final String lockOwner = lockedTasksToStreamThreadOwner.get(taskId); +if (lockOwner != null) { +if (lockOwner.equals(Thread.currentThread().getName())) { +log.trace("{} Found cached state dir lock for task {}", logPrefix(), taskId); +// we already own the lock +return true; +} else { +// another thread owns the lock +return false; +} +} else if (!stateDir.exists()) { Review comment: Good point -- I put this in there because there was a test for this behavior, but actually that makes no sense. Will do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10289: KAFKA-12440: ClusterId validation for Vote, BeginQuorum, EndQuorum and FetchSnapshot
hachikuji commented on a change in pull request #10289: URL: https://github.com/apache/kafka/pull/10289#discussion_r597274570 ## File path: checkstyle/suppressions.xml ## @@ -68,7 +68,7 @@ files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/> + files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|KafkaRaftClient).java"/> Review comment: I agree it is unfortunate. There are probably ways we can improve this. For example, this logic smells a little bit: ```java if (quorum.isLeader()) { logger.debug("Rejecting vote request {} with epoch {} since we are already leader on that epoch", request, candidateEpoch); voteGranted = false; } else if (quorum.isCandidate()) { logger.debug("Rejecting vote request {} with epoch {} since we are already candidate on that epoch", request, candidateEpoch); voteGranted = false; } else if (quorum.isResigned()) { logger.debug("Rejecting vote request {} with epoch {} since we have resigned as candidate/leader in this epoch", request, candidateEpoch); voteGranted = false; } else if (quorum.isFollower()) { FollowerState state = quorum.followerStateOrThrow(); logger.debug("Rejecting vote request {} with epoch {} since we already have a leader {} on that epoch", request, candidateEpoch, state.leaderId()); voteGranted = false; ``` It might be possible to push this logic into `EpochState` or at least to use make use of the `name()` method in the logging. @dengziming would you be interested in following up on this separately? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch
hachikuji commented on a change in pull request #10276: URL: https://github.com/apache/kafka/pull/10276#discussion_r597260954 ## File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala ## @@ -485,6 +485,170 @@ final class KafkaMetadataLogTest { batchBuilder.build() } + @Test + def testValidateEpochGreaterThanLastKnownEpoch(): Unit = { +val log = buildMetadataLog(tempDir, mockTime) + +val numberOfRecords = 1 +val epoch = 1 + +append(log, numberOfRecords, epoch) + +val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 1) +assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind) +assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch()) + } + + @Test + def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = { +val log = buildMetadataLog(tempDir, mockTime) + +val numberOfRecords = 10 +val epoch = 1 + +append(log, numberOfRecords, epoch) +log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)) + +val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch) +TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot => + snapshot.freeze() +} +assertTrue(log.deleteBeforeSnapshot(snapshotId)) + +val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch - 1) +assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind) +assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), resultOffsetAndEpoch.offsetAndEpoch()) Review comment: What I mean is just using the variable that we already have defined. ```scala val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch) ... assertEquals(snapshotId, resultOffsetAndEpoch.offsetAndEpoch()) ``` ## File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala ## @@ -485,6 +485,170 @@ final class KafkaMetadataLogTest { batchBuilder.build() } + @Test + def testValidateEpochGreaterThanLastKnownEpoch(): Unit = { +val log = buildMetadataLog(tempDir, mockTime) + +val numberOfRecords = 1 +val epoch = 1 + +append(log, numberOfRecords, epoch) + +val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 1) +assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind) +assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch()) + } + + @Test + def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = { +val log = buildMetadataLog(tempDir, mockTime) + +val numberOfRecords = 10 +val epoch = 1 + +append(log, numberOfRecords, epoch) +log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)) + +val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch) +TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot => + snapshot.freeze() +} +assertTrue(log.deleteBeforeSnapshot(snapshotId)) + +val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch - 1) +assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind) +assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), resultOffsetAndEpoch.offsetAndEpoch()) Review comment: What I mean is just using the value that we already have defined. ```scala val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch) ... assertEquals(snapshotId, resultOffsetAndEpoch.offsetAndEpoch()) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wcarlson5 opened a new pull request #10356: cache size set by the thread
wcarlson5 opened a new pull request #10356: URL: https://github.com/apache/kafka/pull/10356 Make is so threads do not directly resize other threads caches ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12463) Update default consumer partition assignor for sink tasks
[ https://issues.apache.org/jira/browse/KAFKA-12463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304452#comment-17304452 ] A. Sophie Blee-Goldman commented on KAFKA-12463: Out of curiosity, why use the RoundRobinAssignor rather than the StickyAssignor if you want to avoid cooperative? I'm not a Connect expert so maybe there's some nuance here, but the StickyAssignor is generally preferred (we were planning to make that the default in 3.0 before the CooperativeStickyAssignor came along) > Update default consumer partition assignor for sink tasks > - > > Key: KAFKA-12463 > URL: https://issues.apache.org/jira/browse/KAFKA-12463 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > > Kafka consumers have a pluggable [partition assignment > interface|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html] > that comes with several out-of-the-box implementations including the > [RangeAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html], > > [RoundRobinAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html], > > [StickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html], > and > [CooperativeStickyAssignor|https://kafka.apache.org/27/javadoc/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html]. > If no partition assignor is configured with a consumer, the {{RangeAssignor}} > is used by default. Although there are some benefits to this assignor > including stability of assignment across generations and simplicity of > design, it comes with a major drawback: the number of active consumers in a > group is limited to the number of partitions in the topic(s) with the most > partitions. For an example of the worst case, in a consumer group where every > member is subscribed to ten topics that each have one partition, only one > member of that group will be assigned any topic partitions. > This can end up producing counterintuitive and even frustrating behavior when > a sink connector is brought up with N tasks to read from some collection of > topics with a total of N topic partitions, but some tasks end up idling and > not processing any data. > h3. Proposed Change > *NOTE: Until/unless KAFKA-12477 is addressed, the approach outlined below > will not work as consumers will still perform eager rebalancing as long as at > least one of the partition assignors they are configured with does not > support cooperative rebalancing. KAFKA-12487 should also be addressed before > configuring any Connect worker to use the {{CooperativeStickyAssignor}} for > any sink connectors.* > [KIP-429|https://cwiki.apache.org/confluence/display/KAFKA/KIP-429%3A+Kafka+Consumer+Incremental+Rebalance+Protocol] > introduced the {{CooperativeStickyAssignor}}, which seeks to provide a > stable assignment across generations wherever possible, provide the most even > assignment possible (taking into account possible differences in > subscriptions across consumers in the group), and allow consumers to continue > processing data during rebalance. The documentation for the assignor states > that "Users should prefer this assignor for newer clusters." > We should alter the default consumer configuration for sink tasks to use the > new {{CooperativeStickyAssignor}}. In order to do this in a > backwards-compatible fashion that also enables rolling upgrades, this should > be implemented by changing the {{Worker}} to set the following on the > consumer configuration created for each sink connector task: > {code:java} > partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RangeAssignor > {code} > This way, consumer groups for sink connectors on Connect clusters in the > process of being upgraded will continue to use the {{RangeAssignor}} until > all workers in the cluster have been upgraded, and then will switch over to > the new {{CooperativeStickyAssignor}} automatically. But, this setting will > be overwritten by any user-specified > {{consumer.partition.assignment.strategy}} property in the worker > configuration, and by any user-specified > {{consumer.override.partition.assignment.strategy}} property in a sink > connector configuration when per-connector client overrides is enabled in the > worker config with {{connector.client.config.override.policy=ALL}}. > This improvement is viable as far back as -2.3- 2.4, when the > {{CooperativeStickyAssignor}} was introduced, but given that it is not a bug > fix, should only
[GitHub] [kafka] ableegoldman commented on pull request #10355: KAFKA-12500: fix memory leak in thread cache
ableegoldman commented on pull request #10355: URL: https://github.com/apache/kafka/pull/10355#issuecomment-802298961 @vvcephei yeah I'm on it, just wanted to see if there were any concerns about the proposed fix first. I'll ping you again when it's ready -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12502) Quorum controller should return topic configs in CreateTopic response
Jason Gustafson created KAFKA-12502: --- Summary: Quorum controller should return topic configs in CreateTopic response Key: KAFKA-12502 URL: https://issues.apache.org/jira/browse/KAFKA-12502 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Configs were added to the response in version 5. {code} { "name": "Configs", "type": "[]CreatableTopicConfigs", "versions": "5+", "nullableVersions": "5+", "ignorable": true, "about": "Configuration of the topic.", "fields": [ { "name": "Name", "type": "string", "versions": "5+", "about": "The configuration name." }, { "name": "Value", "type": "string", "versions": "5+", "nullableVersions": "5+", "about": "The configuration value." }, { "name": "ReadOnly", "type": "bool", "versions": "5+", "about": "True if the configuration is read-only." }, { "name": "ConfigSource", "type": "int8", "versions": "5+", "default": "-1", "ignorable": true, "about": "The configuration source." }, { "name": "IsSensitive", "type": "bool", "versions": "5+", "about": "True if this configuration is sensitive." } ]} {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on pull request #10354: MINOR: Exclude KIP-500.md from rat check
ijuma commented on pull request #10354: URL: https://github.com/apache/kafka/pull/10354#issuecomment-802273720 Also cherry-picked to 2.8. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #10351: MINOR: use new method to get number of topics in DeleteTopicsRequest
jolshan commented on pull request #10351: URL: https://github.com/apache/kafka/pull/10351#issuecomment-802271057 Build was broken due to an unrelated change, but should be fixed now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-12501) KafkaClusterTestKit should support mixed-mode testing
[ https://issues.apache.org/jira/browse/KAFKA-12501?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur updated KAFKA-12501: - Description: Follow-on from https://github.com/apache/kafka/pull/10220 We should allow for mixed-mode testing with TestKit. Currently, brokers and controllers nodes are run in their respective modes only (where each has only one process role). It would be nice to allow for "mixed-mode" where one node has both the broker and controller role. This would help us write tests that verify that all the various listeners are working properly on a single node running both roles. Additionally, it would be nice to support observer controllers along with voting controllers As [~hachikuji] [pointed out|https://github.com/apache/kafka/pull/10220#discussion_r588571846], we could probably eliminate some of the manual node building in the testkit code by using KafkaRaftServer directly. was: Follow-on from https://github.com/apache/kafka/pull/10220 We should allow for mixed-mode testing with TestKit. Currently, brokers and controllers nodes are run in their respective modes only (where each has only one process role). It would be nice to allow for "mixed-mode" where one node has both the broker and controller role. This would help us write tests that verify that all the various listeners are working properly on a single node running both roles. As [~hachikuji] [pointed out|https://github.com/apache/kafka/pull/10220#discussion_r588571846], we could probably eliminate some of the manual node building in the testkit code by using KafkaRaftServer directly. > KafkaClusterTestKit should support mixed-mode testing > - > > Key: KAFKA-12501 > URL: https://issues.apache.org/jira/browse/KAFKA-12501 > Project: Kafka > Issue Type: Bug >Reporter: David Arthur >Priority: Minor > > Follow-on from https://github.com/apache/kafka/pull/10220 > We should allow for mixed-mode testing with TestKit. Currently, brokers and > controllers nodes are run in their respective modes only (where each has only > one process role). > It would be nice to allow for "mixed-mode" where one node has both the broker > and controller role. This would help us write tests that verify that all the > various listeners are working properly on a single node running both roles. > Additionally, it would be nice to support observer controllers along with > voting controllers > As [~hachikuji] [pointed > out|https://github.com/apache/kafka/pull/10220#discussion_r588571846], we > could probably eliminate some of the manual node building in the testkit code > by using KafkaRaftServer directly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #10355: KAFKA-12500: fix memory leak in thread cache
ableegoldman commented on a change in pull request #10355: URL: https://github.com/apache/kafka/pull/10355#discussion_r597217235 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1097,13 +1102,17 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) { return Optional.empty(); } -// Returns the number of threads that are not in the DEAD state -- use this over threads.size() +// Returns the number of threads that are not in the DEAD or PENDING_SHUTDOWN state -- use this over threads.size() private int getNumLiveStreamThreads() { final AtomicInteger numLiveThreads = new AtomicInteger(0); synchronized (threads) { processStreamThread(thread -> { if (thread.state() == StreamThread.State.DEAD) { +log.debug("Trimming thread {} from the threads list since it's state is {}", thread.getName(), StreamThread.State.DEAD); threads.remove(thread); +} else if (thread.state() == StreamThread.State.PENDING_SHUTDOWN) { Review comment: You can file an improvement ticket and list some ideas for how to handle this (eg scheduler that periodically skims the `threads` list and resizes the cache when it finds any DEAD threads to trim) but I'm optimistic this won't be an issue for now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10355: KAFKA-12500: fix memory leak in thread cache
ableegoldman commented on a change in pull request #10355: URL: https://github.com/apache/kafka/pull/10355#discussion_r597215093 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1097,13 +1102,17 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) { return Optional.empty(); } -// Returns the number of threads that are not in the DEAD state -- use this over threads.size() +// Returns the number of threads that are not in the DEAD or PENDING_SHUTDOWN state -- use this over threads.size() private int getNumLiveStreamThreads() { final AtomicInteger numLiveThreads = new AtomicInteger(0); synchronized (threads) { processStreamThread(thread -> { if (thread.state() == StreamThread.State.DEAD) { +log.debug("Trimming thread {} from the threads list since it's state is {}", thread.getName(), StreamThread.State.DEAD); threads.remove(thread); +} else if (thread.state() == StreamThread.State.PENDING_SHUTDOWN) { Review comment: Yep -- but imo this should be pretty rare. To hit an OOM you would need to have a relatively huge cache, and few total threads (so that the cache size per thread is still large), and then you'd need the new thread to start up, rebalance to get tasks, finish initializing and restoring them and then start consuming enough data to fill up the cache all before the old thread gets around to closing it's state stores. Sure, threads can sometimes hang during termination but in my experience that's usually after the store closure (and hopefully hanging is itself rare). My point is, this should be good enough for now, and I'd like to stick with the simplest solution this late in the game. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10220: KAFKA-12383: Get RaftClusterTest.java and other KIP-500 junit tests working
mumrah commented on a change in pull request #10220: URL: https://github.com/apache/kafka/pull/10220#discussion_r597214017 ## File path: core/src/test/java/kafka/testkit/KafkaClusterTestKit.java ## @@ -0,0 +1,499 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.testkit; + +import kafka.raft.KafkaRaftManager; +import kafka.server.KafkaConfig; +import kafka.server.KafkaConfig$; +import kafka.server.Server; +import kafka.server.BrokerServer; +import kafka.server.ControllerServer; +import kafka.server.MetaProperties; +import kafka.tools.StorageTool; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.utils.ThreadUtils; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.controller.Controller; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metalog.MetaLogManager; +import org.apache.kafka.raft.RaftConfig; +import org.apache.kafka.raft.metadata.MetaLogRaftShim; +import org.apache.kafka.raft.metadata.MetadataRecordSerde; +import org.apache.kafka.test.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.collection.JavaConverters; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + + +@SuppressWarnings("deprecation") // Needed for Scala 2.12 compatibility +public class KafkaClusterTestKit implements AutoCloseable { +private final static Logger log = LoggerFactory.getLogger(KafkaClusterTestKit.class); + +/** + * This class manages a future which is completed with the proper value for + * controller.quorum.voters once the randomly assigned ports for all the controllers are + * known. + */ +private static class ControllerQuorumVotersFutureManager implements AutoCloseable { +private final int expectedControllers; +private final CompletableFuture> future = new CompletableFuture<>(); +private final Map controllerPorts = new TreeMap<>(); + +ControllerQuorumVotersFutureManager(int expectedControllers) { +this.expectedControllers = expectedControllers; +} + +synchronized void registerPort(int nodeId, int port) { +controllerPorts.put(nodeId, port); +if (controllerPorts.size() >= expectedControllers) { +future.complete(controllerPorts.entrySet().stream(). +map(e -> String.format("%d@localhost:%d", e.getKey(), e.getValue())). +collect(Collectors.toList())); +} +} + +void fail(Throwable e) { +future.completeExceptionally(e); +} + +@Override +public void close() { +future.cancel(true); +} +} + +public static class Builder { +private TestKitNodes nodes; +private Map configProps = new HashMap<>(); + +public Builder(TestKitNodes nodes) { +this.nodes = nodes; +} + +public Builder setConfigProp(String key, String value) { +this.configProps.put(key, value); +return this; +} + +public KafkaClusterTestKit build() throws Exception { +Map controllers = new HashMap<>(); +Map kip500Brokers = new
[GitHub] [kafka] ijuma merged pull request #10354: MINOR: Exclude KIP-500.md from rat check
ijuma merged pull request #10354: URL: https://github.com/apache/kafka/pull/10354 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10354: MINOR: Exclude KIP-500.md from rat check
ijuma commented on pull request #10354: URL: https://github.com/apache/kafka/pull/10354#issuecomment-802259895 Merged to unbreak the build, we can improve further in future PRs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12385) Remove FetchResponse#responseData
[ https://issues.apache.org/jira/browse/KAFKA-12385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304418#comment-17304418 ] Chun-Hao Tang commented on KAFKA-12385: --- [~chia7712] Can I work on this? > Remove FetchResponse#responseData > - > > Key: KAFKA-12385 > URL: https://issues.apache.org/jira/browse/KAFKA-12385 > Project: Kafka > Issue Type: Bug >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > reference to [https://github.com/apache/kafka/pull/9758#discussion_r584142074] > We can rewrite related code to avoid using stale data structure. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] wcarlson5 commented on a change in pull request #10355: KAFKA-12500: fix memory leak in thread cache
wcarlson5 commented on a change in pull request #10355: URL: https://github.com/apache/kafka/pull/10355#discussion_r597208785 ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -971,6 +972,7 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) { synchronized (changeThreadCount) { threadIdx = getNextThreadIndex(); cacheSizePerThread = getCacheSizePerThread(getNumLiveStreamThreads() + 1); +log.info("Adding a new StreamThread with thread id {}; new cache size per thread is {}", threadIdx, cacheSizePerThread); Review comment: Maybe log number of threads here too? ## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ## @@ -1097,13 +1102,17 @@ private int getNumStreamThreads(final boolean hasGlobalTopology) { return Optional.empty(); } -// Returns the number of threads that are not in the DEAD state -- use this over threads.size() +// Returns the number of threads that are not in the DEAD or PENDING_SHUTDOWN state -- use this over threads.size() private int getNumLiveStreamThreads() { final AtomicInteger numLiveThreads = new AtomicInteger(0); synchronized (threads) { processStreamThread(thread -> { if (thread.state() == StreamThread.State.DEAD) { +log.debug("Trimming thread {} from the threads list since it's state is {}", thread.getName(), StreamThread.State.DEAD); threads.remove(thread); +} else if (thread.state() == StreamThread.State.PENDING_SHUTDOWN) { Review comment: are we risking a memory overflow with this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10355: KAFKA-12500: fix memory leak in thread cache
ableegoldman commented on pull request #10355: URL: https://github.com/apache/kafka/pull/10355#issuecomment-802252651 @wcarlson5 @cadonna @lct45 Should be cherrypicked to 2.8 cc @vvcephei -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #10355: KAFKA-12500: fix memory leak in thread cache
ableegoldman opened a new pull request #10355: URL: https://github.com/apache/kafka/pull/10355 Need to exclude threads in `PENDING_SHUTDOWN` from the num live threads computation used to compute the new cache size per thread. Also adds some logging to help follow what's happening when a thread is added/removed/replaced -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10220: KAFKA-12383: Get RaftClusterTest.java and other KIP-500 junit tests working
mumrah commented on a change in pull request #10220: URL: https://github.com/apache/kafka/pull/10220#discussion_r597202938 ## File path: core/src/test/java/kafka/testkit/KafkaClusterTestKit.java ## @@ -0,0 +1,499 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.testkit; + +import kafka.raft.KafkaRaftManager; +import kafka.server.KafkaConfig; +import kafka.server.KafkaConfig$; +import kafka.server.Server; +import kafka.server.BrokerServer; +import kafka.server.ControllerServer; +import kafka.server.MetaProperties; +import kafka.tools.StorageTool; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.utils.ThreadUtils; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.controller.Controller; +import org.apache.kafka.metadata.ApiMessageAndVersion; +import org.apache.kafka.metalog.MetaLogManager; +import org.apache.kafka.raft.RaftConfig; +import org.apache.kafka.raft.metadata.MetaLogRaftShim; +import org.apache.kafka.raft.metadata.MetadataRecordSerde; +import org.apache.kafka.test.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.collection.JavaConverters; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.TreeMap; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + + +@SuppressWarnings("deprecation") // Needed for Scala 2.12 compatibility +public class KafkaClusterTestKit implements AutoCloseable { +private final static Logger log = LoggerFactory.getLogger(KafkaClusterTestKit.class); + +/** + * This class manages a future which is completed with the proper value for + * controller.quorum.voters once the randomly assigned ports for all the controllers are + * known. + */ +private static class ControllerQuorumVotersFutureManager implements AutoCloseable { +private final int expectedControllers; +private final CompletableFuture> future = new CompletableFuture<>(); +private final Map controllerPorts = new TreeMap<>(); + +ControllerQuorumVotersFutureManager(int expectedControllers) { +this.expectedControllers = expectedControllers; +} + +synchronized void registerPort(int nodeId, int port) { +controllerPorts.put(nodeId, port); +if (controllerPorts.size() >= expectedControllers) { +future.complete(controllerPorts.entrySet().stream(). +map(e -> String.format("%d@localhost:%d", e.getKey(), e.getValue())). +collect(Collectors.toList())); +} +} + +void fail(Throwable e) { +future.completeExceptionally(e); +} + +@Override +public void close() { +future.cancel(true); +} +} + +public static class Builder { +private TestKitNodes nodes; +private Map configProps = new HashMap<>(); + +public Builder(TestKitNodes nodes) { +this.nodes = nodes; +} + +public Builder setConfigProp(String key, String value) { +this.configProps.put(key, value); +return this; +} + +public KafkaClusterTestKit build() throws Exception { +Map controllers = new HashMap<>(); +Map kip500Brokers = new
[jira] [Created] (KAFKA-12501) KafkaClusterTestKit should support mixed-mode testing
David Arthur created KAFKA-12501: Summary: KafkaClusterTestKit should support mixed-mode testing Key: KAFKA-12501 URL: https://issues.apache.org/jira/browse/KAFKA-12501 Project: Kafka Issue Type: Bug Reporter: David Arthur Follow-on from https://github.com/apache/kafka/pull/10220 We should allow for mixed-mode testing with TestKit. Currently, brokers and controllers nodes are run in their respective modes only (where each has only one process role). It would be nice to allow for "mixed-mode" where one node has both the broker and controller role. This would help us write tests that verify that all the various listeners are working properly on a single node running both roles. As [~hachikuji] [pointed out|https://github.com/apache/kafka/pull/10220#discussion_r588571846], we could probably eliminate some of the manual node building in the testkit code by using KafkaRaftServer directly. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #10275: KAFKA-12434; Admin support for `DescribeProducers` API
hachikuji commented on a change in pull request #10275: URL: https://github.com/apache/kafka/pull/10275#discussion_r597193539 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiLookupStrategy.java ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.AbstractResponse; + +import java.util.Collections; +import java.util.Map; +import java.util.OptionalInt; +import java.util.Set; + +public interface AdminApiLookupStrategy { + +/** + * Define the scope of a given key for lookup. Key lookups are complicated + * by the need to accommodate different batching mechanics. For example, + * a `Metadata` request supports arbitrary batching of topic partitions in + * order to discover partitions leaders. This can be supported by returning + * a single scope object for all keys. + * + * On the other hand, `FindCoordinator` request only supports lookup of a + * single key. This can be supported by returning a different scope object + * for each lookup key. + * + * @param key the lookup key + * + * @return request scope indicating how lookup requests can be batched together + */ +RequestScope lookupScope(T key); + +/** + * Build the lookup request for a set of keys. The grouping of the keys is controlled + * through {@link #lookupScope(Object)}. In other words, each set of keys that map + * to the same request scope object will be sent to this method. + * + * @param keys the set of keys that require lookup + * + * @return a builder for the lookup request + */ +AbstractRequest.Builder buildRequest(Set keys); + +/** + * Callback that is invoked when a lookup request returns successfully. The handler + * should parse the response, check for errors, and return a result indicating + * which keys were mapped to a brokerId successfully and which keys received + * a fatal error (e.g. a topic authorization failure). + * + * Note that keys which receive a retriable error should be left out of the + * result. They will be retried automatically. For example, if the response of + * `FindCoordinator` request indicates an unavailable coordinator, then the key + * should be left out of the result so that the request will be retried. + * + * @param keys the set of keys from the associated request + * @param response the response received from the broker + * + * @return a result indicating which keys mapped successfully to a brokerId and + * which encountered a fatal error + */ +LookupResult handleResponse(Set keys, AbstractResponse response); + +class LookupResult { +public final Map mappedKeys; +public final Map failedKeys; + +public LookupResult( +Map failedKeys, +Map mappedKeys +) { +this.failedKeys = Collections.unmodifiableMap(failedKeys); +this.mappedKeys = Collections.unmodifiableMap(mappedKeys); +} +} + +interface RequestScope { +default OptionalInt destinationBrokerId() { +return OptionalInt.empty(); +} +} Review comment: I decided to pull this up to the top level. Let me know if that seems ok. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12500) Cache memory is leaked after removing/replacing a StreamThread
[ https://issues.apache.org/jira/browse/KAFKA-12500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reassigned KAFKA-12500: -- Assignee: A. Sophie Blee-Goldman > Cache memory is leaked after removing/replacing a StreamThread > -- > > Key: KAFKA-12500 > URL: https://issues.apache.org/jira/browse/KAFKA-12500 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: A. Sophie Blee-Goldman >Priority: Blocker > Fix For: 2.8.0 > > > We currently leak the memory given to a StreamThread for its cache after it > gets shutdown due to being removed or replaced. > Removal: > If the StreamThread is removing itself, or if a thread is being removed by an > external caller but fails to shutdown within the allotted time, it won't be > removed from the `threads` list to avoid freeing up it's thread id while > still running. If the thread hasn't reached the DEAD state when we resize the > cache, the new cache size per thread is computed based on the number of live > threads, which includes the removed thread at this point. > Replacement: > When a thread is replaced, we first shutdown that thread but hold off on > removing it from the threads list until it's DEAD. Immediately after the > shutdown we call addStreamThread to start up a new thread, which resizes the > cache according to num-live-threads + 1. > In both of these cases, the cache memory of the shutting down thread won't be > made available to the remaining threads. > Note: in general this leak may not be permanent, as a subsequent event once > the thread has reached DEAD and been removed from the threads list will cause > it's memory to be released. OF course if the subsequent event is another > thread removal or replacement, then we just have a new memory leak. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mumrah commented on pull request #10354: MINOR: Exclude KIP-500.md from rat check
mumrah commented on pull request #10354: URL: https://github.com/apache/kafka/pull/10354#issuecomment-802229547 Maybe we should exclude all markdown files from the license check? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12500) Cache memory is leaked after removing/replacing a StreamThread
A. Sophie Blee-Goldman created KAFKA-12500: -- Summary: Cache memory is leaked after removing/replacing a StreamThread Key: KAFKA-12500 URL: https://issues.apache.org/jira/browse/KAFKA-12500 Project: Kafka Issue Type: Bug Components: streams Reporter: A. Sophie Blee-Goldman Fix For: 2.8.0 We currently leak the memory given to a StreamThread for its cache after it gets shutdown due to being removed or replaced. Removal: If the StreamThread is removing itself, or if a thread is being removed by an external caller but fails to shutdown within the allotted time, it won't be removed from the `threads` list to avoid freeing up it's thread id while still running. If the thread hasn't reached the DEAD state when we resize the cache, the new cache size per thread is computed based on the number of live threads, which includes the removed thread at this point. Replacement: When a thread is replaced, we first shutdown that thread but hold off on removing it from the threads list until it's DEAD. Immediately after the shutdown we call addStreamThread to start up a new thread, which resizes the cache according to num-live-threads + 1. In both of these cases, the cache memory of the shutting down thread won't be made available to the remaining threads. Note: in general this leak may not be permanent, as a subsequent event once the thread has reached DEAD and been removed from the threads list will cause it's memory to be released. OF course if the subsequent event is another thread removal or replacement, then we just have a new memory leak. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on pull request #10354: MINOR: Exclude KIP-500.md from rat check
cmccabe commented on pull request #10354: URL: https://github.com/apache/kafka/pull/10354#issuecomment-802214287 Thanks, @jolshan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #10354: MINOR: Exclude KIP-500.md from rat check
vvcephei commented on pull request #10354: URL: https://github.com/apache/kafka/pull/10354#issuecomment-802212427 Thanks, @jolshan ! Will this need to be cherry-picked to 2.8? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rohitrmd commented on a change in pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch
rohitrmd commented on a change in pull request #10276: URL: https://github.com/apache/kafka/pull/10276#discussion_r597158051 ## File path: raft/src/main/java/org/apache/kafka/raft/ValidOffsetAndEpoch.java ## @@ -16,40 +16,56 @@ */ package org.apache.kafka.raft; +import java.util.Objects; + public final class ValidOffsetAndEpoch { -final private Type type; +final private Kind kind; Review comment: Agree with existing design and limitations of interface compared to case classes. Java 15 has sealed classes which will be good alternative: https://openjdk.java.net/jeps/360 to interfaces as we will be limiting the extension. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #10354: MINOR: Exclude KIP-500.md from rat check
hachikuji commented on pull request #10354: URL: https://github.com/apache/kafka/pull/10354#issuecomment-802206799 Thanks for fixing this so quickly. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #10349: MINOR: Move `configurations.all` to be a child of `allprojects`
ijuma merged pull request #10349: URL: https://github.com/apache/kafka/pull/10349 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan opened a new pull request #10354: MINOR: Exclude KIP-500.md from rat check
jolshan opened a new pull request #10354: URL: https://github.com/apache/kafka/pull/10354 Builds are failing since this file does not have a license. Similar .md files do not seem to have licenses, so I've added this file to the exclude list. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rohitrmd commented on a change in pull request #10276: KAFKA-12253: Add tests that cover all of the cases for ReplicatedLog's validateOffsetAndEpoch
rohitrmd commented on a change in pull request #10276: URL: https://github.com/apache/kafka/pull/10276#discussion_r597144327 ## File path: core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala ## @@ -485,6 +485,170 @@ final class KafkaMetadataLogTest { batchBuilder.build() } + @Test + def testValidateEpochGreaterThanLastKnownEpoch(): Unit = { +val log = buildMetadataLog(tempDir, mockTime) + +val numberOfRecords = 1 +val epoch = 1 + +append(log, numberOfRecords, epoch) + +val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch + 1) +assertEquals(ValidOffsetAndEpoch.Kind.DIVERGING, resultOffsetAndEpoch.kind) +assertEquals(new OffsetAndEpoch(log.endOffset.offset, epoch), resultOffsetAndEpoch.offsetAndEpoch()) + } + + @Test + def testValidateEpochLessThanOldestSnapshotEpoch(): Unit = { +val log = buildMetadataLog(tempDir, mockTime) + +val numberOfRecords = 10 +val epoch = 1 + +append(log, numberOfRecords, epoch) +log.updateHighWatermark(new LogOffsetMetadata(numberOfRecords)) + +val snapshotId = new OffsetAndEpoch(numberOfRecords, epoch) +TestUtils.resource(log.createSnapshot(snapshotId)) { snapshot => + snapshot.freeze() +} +assertTrue(log.deleteBeforeSnapshot(snapshotId)) + +val resultOffsetAndEpoch = log.validateOffsetAndEpoch(numberOfRecords, epoch - 1) +assertEquals(ValidOffsetAndEpoch.Kind.SNAPSHOT, resultOffsetAndEpoch.kind) +assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), resultOffsetAndEpoch.offsetAndEpoch()) Review comment: @hachikuji can you explain what is meant by using snapshotId? You mean changing `assertEquals(new OffsetAndEpoch(numberOfRecords, epoch), resultOffsetAndEpoch.offsetAndEpoch())` to`assertEquals(new OffsetAndEpoch(snapshotId.offset, snapshotId.epoch), resultOffsetAndEpoch.offsetAndEpoch()) ` and not use variables? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah opened a new pull request #10353: DO NOT MERGE: Add a failing unit test to check Gradle and Github behavior
mumrah opened a new pull request #10353: URL: https://github.com/apache/kafka/pull/10353 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-3813) Let SourceConnector implementations access the offset reader
[ https://issues.apache.org/jira/browse/KAFKA-3813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-3813. -- Resolution: Duplicate > Let SourceConnector implementations access the offset reader > > > Key: KAFKA-3813 > URL: https://issues.apache.org/jira/browse/KAFKA-3813 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Affects Versions: 0.10.0.0 >Reporter: Randall Hauch >Priority: Major > Labels: needs-kip > > When a source connector is started, having access to the > {{OffsetStorageReader}} would allow it to more intelligently configure its > tasks based upon the stored offsets. (Currently only the {{SourceTask}} > implementations can access the offset reader (via the {{SourceTaskContext}}), > but the {{SourceConnector}} does not have access to the offset reader.) > Of course, accessing the stored offsets is not likely to be useful for sink > connectors. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Reopened] (KAFKA-3813) Let SourceConnector implementations access the offset reader
[ https://issues.apache.org/jira/browse/KAFKA-3813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch reopened KAFKA-3813: -- > Let SourceConnector implementations access the offset reader > > > Key: KAFKA-3813 > URL: https://issues.apache.org/jira/browse/KAFKA-3813 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Affects Versions: 0.10.0.0 >Reporter: Randall Hauch >Priority: Major > Labels: needs-kip > > When a source connector is started, having access to the > {{OffsetStorageReader}} would allow it to more intelligently configure its > tasks based upon the stored offsets. (Currently only the {{SourceTask}} > implementations can access the offset reader (via the {{SourceTaskContext}}), > but the {{SourceConnector}} does not have access to the offset reader.) > Of course, accessing the stored offsets is not likely to be useful for sink > connectors. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-3813) Let SourceConnector implementations access the offset reader
[ https://issues.apache.org/jira/browse/KAFKA-3813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17304330#comment-17304330 ] Randall Hauch commented on KAFKA-3813: -- [KIP-131|https://cwiki.apache.org/confluence/display/KAFKA/KIP-131+-+Add+access+to+OffsetStorageReader+from+SourceConnector] was implemented with KAFKA-4794. > Let SourceConnector implementations access the offset reader > > > Key: KAFKA-3813 > URL: https://issues.apache.org/jira/browse/KAFKA-3813 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Affects Versions: 0.10.0.0 >Reporter: Randall Hauch >Priority: Major > Labels: needs-kip > > When a source connector is started, having access to the > {{OffsetStorageReader}} would allow it to more intelligently configure its > tasks based upon the stored offsets. (Currently only the {{SourceTask}} > implementations can access the offset reader (via the {{SourceTaskContext}}), > but the {{SourceConnector}} does not have access to the offset reader.) > Of course, accessing the stored offsets is not likely to be useful for sink > connectors. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji merged pull request #10344: MINOR: Remove use of NoSuchElementException
hachikuji merged pull request #10344: URL: https://github.com/apache/kafka/pull/10344 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org