[GitHub] [kafka] Playerharperb commented on pull request #13162: fix: replace an inefficient loop in kafka internals
Playerharperb commented on PR #13162: URL: https://github.com/apache/kafka/pull/13162#issuecomment-1403125466 Hello my name is playerharp...@gmail.com and I am interested in this job posting on the website -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14651) Add connectorDeleted flag when stopping tasks
[ https://issues.apache.org/jira/browse/KAFKA-14651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Hector Geraldino updated KAFKA-14651: - Description: Jira ticket for [KIP-901|https://cwiki.apache.org/confluence/display/KAFKA/KIP-901%3A+Add+connectorDeleted+flag+when+stopping+tasks] It would be useful for Connectors to know when its instance is being deleted. This will give a chance to connector tasks to perform any cleanup routines before as part of the connector removal process. was:It would be useful for Connectors to know when its instance is being deleted. This will give a chance to connectors to perform any cleanup tasks (e.g. deleting external resources, or deleting offsets) before the connector is completely removed from the cluster. > Add connectorDeleted flag when stopping tasks > - > > Key: KAFKA-14651 > URL: https://issues.apache.org/jira/browse/KAFKA-14651 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Hector Geraldino >Assignee: Hector Geraldino >Priority: Minor > > Jira ticket for > [KIP-901|https://cwiki.apache.org/confluence/display/KAFKA/KIP-901%3A+Add+connectorDeleted+flag+when+stopping+tasks] > It would be useful for Connectors to know when its instance is being deleted. > This will give a chance to connector tasks to perform any cleanup routines > before as part of the connector removal process. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14651) Add connectorDeleted flag when stopping tasks
Hector Geraldino created KAFKA-14651: Summary: Add connectorDeleted flag when stopping tasks Key: KAFKA-14651 URL: https://issues.apache.org/jira/browse/KAFKA-14651 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Hector Geraldino Assignee: Hector Geraldino It would be useful for Connectors to know when its instance is being deleted. This will give a chance to connectors to perform any cleanup tasks (e.g. deleting external resources, or deleting offsets) before the connector is completely removed from the cluster. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] rayokota commented on pull request #12126: KAFKA-8713 KIP-581: Add new conf serialize.accept.optional.null in connect-json
rayokota commented on PR #12126: URL: https://github.com/apache/kafka/pull/12126#issuecomment-1403014501 @blueberrysugarhigh , sorry I'm not the author of this PR and I haven't tried it myself. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] blueberrysugarhigh commented on pull request #12126: KAFKA-8713 KIP-581: Add new conf serialize.accept.optional.null in connect-json
blueberrysugarhigh commented on PR #12126: URL: https://github.com/apache/kafka/pull/12126#issuecomment-1402996186 > @rayokota sorry for bothering you. I'm trying to maintain my own version of `JsonConverter`. I just copied the code and pasted it to a new class `JsonConverter`. and I replaced the following line > > https://github.com/a0x8o/kafka/blob/283d07f9d2577e78b7aab7a419818cf9cda19d0a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L664 > > to > > ```java > obj.set(field.name(), this.convertToJson(field.schema(), struct.getWithoutDefault(field.name(; > ``` > > Then I changed my debezium config to > > ``` > ... > "key.converter": "org.apache.kafka.connect.json.MyJsonConverter", > "key.converter.schemas.enabled": true, > "value.converter": "org.apache.kafka.connect.json.MyJsonConverter", > "value.converter.schemas.enabled": true, > ``` > > but seems it still does not work. I still got default value with an nullable field. do u know why? what else do I need to do? Have you been able to make this work ? I just ran into the same issue. I upgraded my schema registry version and tried using the configuration in the connector, but it still was showing the wrong data. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14533) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
[ https://issues.apache.org/jira/browse/KAFKA-14533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17680403#comment-17680403 ] A. Sophie Blee-Goldman commented on KAFKA-14533: I'll give it a few more days before re-enabling both parameters, but so far I've got a few runs in with only the `false` parameter enabled and this seems to have fixed the flakiness. Can't really envision why the state updater would/could the listOffsets request to fail in the way shown above, but it really does appear to be something about enabling this that so badly broke the SmokeTestDriverIntegrationTest Definitely need to look into this before we consider publicly releasing the state updater feature cc [~cadonna] [~lbrutschy] [~guozhang] > Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance > - > > Key: KAFKA-14533 > URL: https://issues.apache.org/jira/browse/KAFKA-14533 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Greg Harris >Priority: Major > Labels: flaky-test > > The SmokeTestDriverIntegrationTest appears to be flakey failing in recent > runs: > ``` > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1444/tests/ > java.util.concurrent.TimeoutException: > shouldWorkWithRebalance(boolean) timed out after 600 seconds > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1443/tests/ > java.util.concurrent.TimeoutException: > shouldWorkWithRebalance(boolean) timed out after 600 seconds > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1441/tests/ > java.util.concurrent.TimeoutException: > shouldWorkWithRebalance(boolean) timed out after 600 seconds > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1440/tests/ > java.util.concurrent.TimeoutException: > shouldWorkWithRebalance(boolean) timed out after 600 seconds > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1438/tests/ > java.util.concurrent.TimeoutException: > shouldWorkWithRebalance(boolean) timed out after 600 seconds > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1434/tests/ > java.util.concurrent.TimeoutException: > shouldWorkWithRebalance(boolean) timed out after 600 seconds > ``` > The stacktrace appears to be: > ``` > java.util.concurrent.TimeoutException: shouldWorkWithRebalance(boolean) timed > out after 600 seconds > at > org.junit.jupiter.engine.extension.TimeoutExceptionFactory.create(TimeoutExceptionFactory.java:29) > at > org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:58) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > ... > Suppressed: java.lang.InterruptedException: sleep interrupted > at java.lang.Thread.sleep(Native Method) > at > org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.shouldWorkWithRebalance(SmokeTestDriverIntegrationTest.java:151) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45) > ... 134 more > ``` > The test appears to be timing out waiting for the SmokeTestClient to complete > its asynchronous close, and taking significantly longer to do so (600s > instead of 60s) than a typical local test execution time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe merged pull request #13151: Fail broker and controller startup on authorizer failure
cmccabe merged PR #13151: URL: https://github.com/apache/kafka/pull/13151 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14650) IQv2 can throw ConcurrentModificationException when accessing Tasks
A. Sophie Blee-Goldman created KAFKA-14650: -- Summary: IQv2 can throw ConcurrentModificationException when accessing Tasks Key: KAFKA-14650 URL: https://issues.apache.org/jira/browse/KAFKA-14650 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.4.0 Reporter: A. Sophie Blee-Goldman >From failure in *[PositionRestartIntegrationTest.verifyStore[cache=false, >log=true, supplier=IN_MEMORY_WINDOW, >kind=PAPI]|https://ci-builds.apache.org/job/Kafka/job/kafka/job/3.4/63/testReport/junit/org.apache.kafka.streams.integration/PositionRestartIntegrationTest/Build___JDK_11_and_Scala_2_13___verifyStore_cache_false__log_true__supplier_IN_MEMORY_WINDOW__kind_PAPI_/]* java.util.ConcurrentModificationException at java.base/java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1208) at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1244) at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1239) at java.base/java.util.HashMap.putMapEntries(HashMap.java:508) at java.base/java.util.HashMap.putAll(HashMap.java:781) at org.apache.kafka.streams.processor.internals.Tasks.allTasksPerId(Tasks.java:361) at org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1537) at org.apache.kafka.streams.processor.internals.StreamThread.allTasks(StreamThread.java:1278) at org.apache.kafka.streams.KafkaStreams.query(KafkaStreams.java:1921) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.iqv2WaitForResult(IntegrationTestUtils.java:168) at org.apache.kafka.streams.integration.PositionRestartIntegrationTest.shouldReachExpectedPosition(PositionRestartIntegrationTest.java:438) at org.apache.kafka.streams.integration.PositionRestartIntegrationTest.verifyStore(PositionRestartIntegrationTest.java:423) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] guozhangwang commented on pull request #13160: chore: fix flaky DefaultStateUpdaterTest
guozhangwang commented on PR #13160: URL: https://github.com/apache/kafka/pull/13160#issuecomment-1402827998 LGTM! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang merged pull request #13160: chore: fix flaky DefaultStateUpdaterTest
guozhangwang merged PR #13160: URL: https://github.com/apache/kafka/pull/13160 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14649) Failures instantiating Connect plugins hides other plugins from REST API, or crash worker
[ https://issues.apache.org/jira/browse/KAFKA-14649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-14649: Description: Connect plugin path scanning evaluates the version() method of plugins to determine which version of a plugin to load, and what version to advertise as part of the REST API. This process involves reflectively constructing an instance of the class and calling the version method, which can fail in the following scenarios: 1. If a plugin throws an exception from a static initialization block 2. If a plugin does not have a default constructor (such as a non-static inner class) 3. If a plugin has a default constructor is not public 4. If a plugin throws an exception from the default constructor 5. If a plugin's version method throws an exception If any of the above is true for any single connector or rest extension on the classpath or plugin.path, the plugin path scanning will exit early, and potentially hide other unrelated plugins. This is primarily an issue in development and test environments, because they are easy-to-make code mistakes that would generally not make it to a release. Exceptions from the version method, however, can cause the worker to fail to start up as they are uncaught. It is desirable for the worker to instead log these exceptions and continue. This will prevent one mis-implemented plugin from affecting other plugins, while still causing integration tests to fail against the plugin itself. We can augment logging to make it clear how to correct these failures, where before it was rather opaque and difficult to debug. was: Connect plugin path scanning evaluates the version() method of plugins to determine which version of a plugin to load, and what version to advertise as part of the REST API. This process involves reflectively constructing an instance of the class and calling the version method, which can fail in the following scenarios: 1. If a plugin throws an exception from a static initialization block 2. If a plugin does not have a default constructor (such as a non-static inner class) 3. If a plugin has a default constructor is not public 4. If a plugin throws an exception from the default constructor 5. If a plugin's version method throws an exception If any of the above is true for any single connector or rest extension on the classpath or plugin.path, the plugin path scanning will exit early, and potentially hide other unrelated plugins. This is primarily an issue in development and test environments, because they are easy-to-make code mistakes that would generally not make it to a release. Exceptions from the version method, however, can cause the worker to throw It is desirable for the worker to instead log these exceptions and continue. This will prevent one mis-implemented plugin from affecting other plugins, while still causing integration tests to fail against the plugin itself. We can augment logging to make it clear how to correct these failures, where before it was rather opaque and difficult to debug. > Failures instantiating Connect plugins hides other plugins from REST API, or > crash worker > - > > Key: KAFKA-14649 > URL: https://issues.apache.org/jira/browse/KAFKA-14649 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.1.0, 3.0.0, 3.2.0, 3.3.0, 3.4.0 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Minor > > Connect plugin path scanning evaluates the version() method of plugins to > determine which version of a plugin to load, and what version to advertise as > part of the REST API. This process involves reflectively constructing an > instance of the class and calling the version method, which can fail in the > following scenarios: > 1. If a plugin throws an exception from a static initialization block > 2. If a plugin does not have a default constructor (such as a non-static > inner class) > 3. If a plugin has a default constructor is not public > 4. If a plugin throws an exception from the default constructor > 5. If a plugin's version method throws an exception > If any of the above is true for any single connector or rest extension on the > classpath or plugin.path, the plugin path scanning will exit early, and > potentially hide other unrelated plugins. This is primarily an issue in > development and test environments, because they are easy-to-make code > mistakes that would generally not make it to a release. Exceptions from the > version method, however, can cause the worker to fail to start up as they are > uncaught. > It is desirable for the worker to instead log these exceptions and continue. > This will prevent one mis-implemented plugin from affecting other plugins,
[jira] [Updated] (KAFKA-14649) Failures instantiating Connect plugins hides other plugins from REST API, or crash worker
[ https://issues.apache.org/jira/browse/KAFKA-14649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-14649: Summary: Failures instantiating Connect plugins hides other plugins from REST API, or crash worker (was: Failures instantiating Connect plugins hides other plugins from REST API) > Failures instantiating Connect plugins hides other plugins from REST API, or > crash worker > - > > Key: KAFKA-14649 > URL: https://issues.apache.org/jira/browse/KAFKA-14649 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.1.0, 3.0.0, 3.2.0, 3.3.0, 3.4.0 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Minor > > Connect plugin path scanning evaluates the version() method of plugins to > determine which version of a plugin to load, and what version to advertise as > part of the REST API. This process involves reflectively constructing an > instance of the class and calling the version method, which can fail in the > following scenarios: > 1. If a plugin throws an exception from a static initialization block > 2. If a plugin does not have a default constructor (such as a non-static > inner class) > 3. If a plugin has a default constructor is not public > 4. If a plugin throws an exception from the default constructor > 5. If a plugin's version method throws an exception > If any of the above is true for any single connector or rest extension on the > classpath or plugin.path, the plugin path scanning will exit early, and > potentially hide other unrelated plugins. This is primarily an issue in > development and test environments, because they are easy-to-make code > mistakes that would generally not make it to a release. Exceptions from the > version method, however, can cause the worker to throw > It is desirable for the worker to instead log these exceptions and continue. > This will prevent one mis-implemented plugin from affecting other plugins, > while still causing integration tests to fail against the plugin itself. We > can augment logging to make it clear how to correct these failures, where > before it was rather opaque and difficult to debug. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14649) Failures instantiating Connect plugins hides other plugins from REST API
[ https://issues.apache.org/jira/browse/KAFKA-14649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-14649: Description: Connect plugin path scanning evaluates the version() method of plugins to determine which version of a plugin to load, and what version to advertise as part of the REST API. This process involves reflectively constructing an instance of the class and calling the version method, which can fail in the following scenarios: 1. If a plugin throws an exception from a static initialization block 2. If a plugin does not have a default constructor (such as a non-static inner class) 3. If a plugin has a default constructor is not public 4. If a plugin throws an exception from the default constructor 5. If a plugin's version method throws an exception If any of the above is true for any single connector or rest extension on the classpath or plugin.path, the plugin path scanning will exit early, and potentially hide other unrelated plugins. This is primarily an issue in development and test environments, because they are easy-to-make code mistakes that would generally not make it to a release. Exceptions from the version method, however, can cause the worker to throw It is desirable for the worker to instead log these exceptions and continue. This will prevent one mis-implemented plugin from affecting other plugins, while still causing integration tests to fail against the plugin itself. We can augment logging to make it clear how to correct these failures, where before it was rather opaque and difficult to debug. was: Connect plugin path scanning evaluates the version() method of plugins to determine which version of a plugin to load, and what version to advertise as part of the REST API. This process involves reflectively constructing an instance of the class and calling the version method, which can fail in the following scenarios: 1. If a plugin throws an exception from a static initialization block 2. If a plugin does not have a default constructor (such as a non-static inner class) 3. If a plugin has a default constructor is not public 4. If a plugin throws an exception from the default constructor 5. If a plugin's version method throws an exception If any of the above is true for any single connector or rest extension on the classpath or plugin.path, the worker will fail to start up entirely. This is primarily an issue in development and test environments, because they are easy-to-make code mistakes that would generally not make it to a release. It is desirable for the worker to instead log these exceptions and continue. This will prevent one mis-implemented plugin from affecting other plugins, while still causing integration tests to fail against the plugin itself. We can augment logging to make it clear how to correct these failures, where before it was rather opaque and difficult to debug. > Failures instantiating Connect plugins hides other plugins from REST API > > > Key: KAFKA-14649 > URL: https://issues.apache.org/jira/browse/KAFKA-14649 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.1.0, 3.0.0, 3.2.0, 3.3.0, 3.4.0 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Minor > > Connect plugin path scanning evaluates the version() method of plugins to > determine which version of a plugin to load, and what version to advertise as > part of the REST API. This process involves reflectively constructing an > instance of the class and calling the version method, which can fail in the > following scenarios: > 1. If a plugin throws an exception from a static initialization block > 2. If a plugin does not have a default constructor (such as a non-static > inner class) > 3. If a plugin has a default constructor is not public > 4. If a plugin throws an exception from the default constructor > 5. If a plugin's version method throws an exception > If any of the above is true for any single connector or rest extension on the > classpath or plugin.path, the plugin path scanning will exit early, and > potentially hide other unrelated plugins. This is primarily an issue in > development and test environments, because they are easy-to-make code > mistakes that would generally not make it to a release. Exceptions from the > version method, however, can cause the worker to throw > It is desirable for the worker to instead log these exceptions and continue. > This will prevent one mis-implemented plugin from affecting other plugins, > while still causing integration tests to fail against the plugin itself. We > can augment logging to make it clear how to correct these failures, where > before it was rather opaque and difficult to debug. -- This
[jira] [Updated] (KAFKA-14649) Failures instantiating Connect plugins hides other plugins from REST API
[ https://issues.apache.org/jira/browse/KAFKA-14649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Greg Harris updated KAFKA-14649: Summary: Failures instantiating Connect plugins hides other plugins from REST API (was: Failures instantiating Connect plugins crash worker on startup) > Failures instantiating Connect plugins hides other plugins from REST API > > > Key: KAFKA-14649 > URL: https://issues.apache.org/jira/browse/KAFKA-14649 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.1.0, 3.0.0, 3.2.0, 3.3.0, 3.4.0 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Minor > > Connect plugin path scanning evaluates the version() method of plugins to > determine which version of a plugin to load, and what version to advertise as > part of the REST API. This process involves reflectively constructing an > instance of the class and calling the version method, which can fail in the > following scenarios: > 1. If a plugin throws an exception from a static initialization block > 2. If a plugin does not have a default constructor (such as a non-static > inner class) > 3. If a plugin has a default constructor is not public > 4. If a plugin throws an exception from the default constructor > 5. If a plugin's version method throws an exception > If any of the above is true for any single connector or rest extension on the > classpath or plugin.path, the worker will fail to start up entirely. This is > primarily an issue in development and test environments, because they are > easy-to-make code mistakes that would generally not make it to a release. > It is desirable for the worker to instead log these exceptions and continue. > This will prevent one mis-implemented plugin from affecting other plugins, > while still causing integration tests to fail against the plugin itself. We > can augment logging to make it clear how to correct these failures, where > before it was rather opaque and difficult to debug. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14649) Failures instantiating Connect plugins crash worker on startup
Greg Harris created KAFKA-14649: --- Summary: Failures instantiating Connect plugins crash worker on startup Key: KAFKA-14649 URL: https://issues.apache.org/jira/browse/KAFKA-14649 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 3.3.0, 3.2.0, 3.0.0, 3.1.0, 3.4.0 Reporter: Greg Harris Assignee: Greg Harris Connect plugin path scanning evaluates the version() method of plugins to determine which version of a plugin to load, and what version to advertise as part of the REST API. This process involves reflectively constructing an instance of the class and calling the version method, which can fail in the following scenarios: 1. If a plugin throws an exception from a static initialization block 2. If a plugin does not have a default constructor (such as a non-static inner class) 3. If a plugin has a default constructor is not public 4. If a plugin throws an exception from the default constructor 5. If a plugin's version method throws an exception If any of the above is true for any single connector or rest extension on the classpath or plugin.path, the worker will fail to start up entirely. This is primarily an issue in development and test environments, because they are easy-to-make code mistakes that would generally not make it to a release. It is desirable for the worker to instead log these exceptions and continue. This will prevent one mis-implemented plugin from affecting other plugins, while still causing integration tests to fail against the plugin itself. We can augment logging to make it clear how to correct these failures, where before it was rather opaque and difficult to debug. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13143: KAFKA-14491: [3/N] Add logical key value segments
vcrfxia commented on code in PR #13143: URL: https://github.com/apache/kafka/pull/13143#discussion_r1085990051 ## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java: ## @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import java.nio.ByteBuffer; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.kafka.common.serialization.BytesSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.rocksdb.RocksDBException; +import org.rocksdb.WriteBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This "logical segment" is a segment which shares its underlying physical store with other + * logical segments. Each segment uses a unique, fixed-length key prefix derived from the + * segment ID when writing to the shared physical store. + */ +class LogicalKeyValueSegment implements Comparable, Segment { +private static final Logger log = LoggerFactory.getLogger(LogicalKeyValueSegment.class); + +public final long id; +private final String name; +private final RocksDBStore physicalStore; +private final PrefixKeyFormatter prefixKeyFormatter; + +private volatile boolean open = false; +final Set> openIterators = Collections.synchronizedSet(new HashSet<>()); + +LogicalKeyValueSegment(final long id, + final String name, + final RocksDBStore physicalStore) { +this.id = id; +this.name = name; +this.physicalStore = Objects.requireNonNull(physicalStore); + +this.prefixKeyFormatter = new PrefixKeyFormatter(serializeLongToBytes(id)); +} + +void openDB() { +open = true; +} + +@Override +public int compareTo(final LogicalKeyValueSegment segment) { +return Long.compare(id, segment.id); +} + +@Override +public synchronized void destroy() { +final Bytes keyPrefix = prefixKeyFormatter.getPrefix(); + +// this is a prefix deletion, because the deleteRange() implementation +// calls Bytes.increment() in order to make keyTo inclusive +physicalStore.deleteRange(keyPrefix, keyPrefix); +} + +@Override +public synchronized void deleteRange(final Bytes keyFrom, final Bytes keyTo) { +physicalStore.deleteRange( +prefixKeyFormatter.forPhysicalStore(keyFrom), +prefixKeyFormatter.forPhysicalStore(keyTo)); +} + +@Override +public synchronized void put(final Bytes key, final byte[] value) { +physicalStore.put( +prefixKeyFormatter.forPhysicalStore(key), +value); +} + +@Override +public synchronized byte[] putIfAbsent(final Bytes key, final byte[] value) { +return physicalStore.putIfAbsent( +prefixKeyFormatter.forPhysicalStore(key), +value); +} + +@Override +public synchronized void putAll(final List> entries) { +physicalStore.putAll(entries.stream() +.map(kv -> new KeyValue<>( +prefixKeyFormatter.forPhysicalStore(kv.key), +kv.value)) +.collect(Collectors.toList())); +} + +@Override +public synchronized byte[] delete(final Bytes key) { +return physicalStore.delete(prefixKeyFormatter.forPhysicalStore(key)); +} + +@Override +public String name() { +return name; +} + +@Deprecated +@Override +public void init(final ProcessorContext context, final StateStore root) { +throw new UnsupportedOperationException("cannot initialize a logical segment"); +} + +@Override +public void flush() { +throw new UnsupportedOperationException("nothing to flush for logical segment"); +} + +
[jira] [Created] (KAFKA-14648) Do not fail clients if bootstrap servers is not immediately resolvable
Jason Gustafson created KAFKA-14648: --- Summary: Do not fail clients if bootstrap servers is not immediately resolvable Key: KAFKA-14648 URL: https://issues.apache.org/jira/browse/KAFKA-14648 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson In dynamic environments, such as system tests, there is sometimes a delay between when a client is initialized and when the configured bootstrap servers become available in DNS. Currently clients will fail immediately if none of the bootstrap servers can resolve. It would be more convenient for these environments to provide a grace period to give more time for initialization. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cmccabe commented on a diff in pull request #13140: KAFKA-14644: Process should crash after failure in Raft IO thread
cmccabe commented on code in PR #13140: URL: https://github.com/apache/kafka/pull/13140#discussion_r1085968353 ## core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala: ## @@ -198,7 +200,8 @@ class RaftManagerTest { @Test def testShutdownIoThread(): Unit = { val raftClient = mock(classOf[KafkaRaftClient[String]]) -val ioThread = new RaftIoThread(raftClient, threadNamePrefix = "test-raft") +val faultHandler = mock(classOf[FaultHandler]) Review Comment: use `MockFaultHandler` here then, lalter in the test, do `maybeRethrowFirstException` to ensure no faults happened -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13142: KAFKA-14491: [2/N] Refactor RocksDB store open iterator management
vcrfxia commented on code in PR #13142: URL: https://github.com/apache/kafka/pull/13142#discussion_r1085933946 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java: ## @@ -351,13 +360,23 @@ public QueryResult query( @Override public , P> KeyValueIterator prefixScan(final P prefix, final PS prefixKeySerializer) { +if (userManagedIterators) { +throw new IllegalStateException("Must specify openIterators in call to prefixScan()"); +} +return prefixScan(prefix, prefixKeySerializer, openIterators); +} + +, P> KeyValueIterator prefixScan(final P prefix, + final PS prefixKeySerializer, + final Set> openIterators) { Review Comment: No, that's a good point. I've just added this additional validation, which required refactoring these calls so that the two versions of the method do not call each other. Instead they each call a third (helper) method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vcrfxia commented on a diff in pull request #13142: KAFKA-14491: [2/N] Refactor RocksDB store open iterator management
vcrfxia commented on code in PR #13142: URL: https://github.com/apache/kafka/pull/13142#discussion_r1085935187 ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java: ## @@ -383,7 +383,9 @@ public KeyValue makeNext() { @Override public synchronized void close() { -openIterators.remove(this); +if (closeCallback != null) { Review Comment: Unclear whether we want to require that a closeCallback is always registered in general, but it is true that for these two specific classes (RocksDbIterator and RocksDBDualCFIterator), we do want to require that a closeCallback is always set. I've updated the code to reflect this. ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java: ## @@ -351,13 +360,23 @@ public QueryResult query( @Override public , P> KeyValueIterator prefixScan(final P prefix, final PS prefixKeySerializer) { +if (userManagedIterators) { +throw new IllegalStateException("Must specify openIterators in call to prefixScan()"); +} +return prefixScan(prefix, prefixKeySerializer, openIterators); +} + +, P> KeyValueIterator prefixScan(final P prefix, + final PS prefixKeySerializer, + final Set> openIterators) { Review Comment: No. I've just added this additional validation, which required refactoring these calls so that the two versions of the method do not call each other. Instead they each call a third (helper) method. ## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java: ## @@ -114,6 +114,7 @@ public class RocksDBStore implements KeyValueStore, BatchWritingS private boolean userSpecifiedStatistics = false; private final RocksDBMetricsRecorder metricsRecorder; +private final boolean userManagedIterators; Review Comment: Good point. I think `selfManagedIterators` is also confusing though because it's unclear whether "self" means the store itself or the caller themselves. I've updated this to `autoManagedIterators` which means the opposite of what I initially had (for `userManagedIterators`) and added a comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dpcollins-google opened a new pull request, #13162: fix: replace an inefficient loop in kafka internals
dpcollins-google opened a new pull request, #13162: URL: https://github.com/apache/kafka/pull/13162 Instead use Channels.newChannel to write in larger chunks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Cerchie opened a new pull request, #13161: Kafka 14128
Cerchie opened a new pull request, #13161: URL: https://github.com/apache/kafka/pull/13161 In response to [14128](https://issues.apache.org/jira/browse/KAFKA-14128). Addresses by moving final catch condition into an else block. Testing strategy: I'm attempting a unit test first. I've cp'ed from the test above so I can use the MockTime and InternalTopicManager. Currently, when I run the test, it's throwing an error the top line of which is: ``` expected org.apache.kafka.common.errors.TimeoutException to be thrown, but nothing was thrown ``` This is prior to the issue of figuring out the text of the error, which is my next step. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lucasbru opened a new pull request, #13160: chore: fix flaky DefaultStateUpdaterTest
lucasbru opened a new pull request, #13160: URL: https://github.com/apache/kafka/pull/13160 Mockito should not make named topologies paused by default. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram merged pull request #13119: KAFKA-14623: OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging
rajinisivaram merged PR #13119: URL: https://github.com/apache/kafka/pull/13119 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #13119: KAFKA-14623: OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging
rajinisivaram commented on PR #13119: URL: https://github.com/apache/kafka/pull/13119#issuecomment-1402448121 Test failures not related, merging to trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on pull request #13157: KAFKA-14599 Enable javadoc for :connect:mirror module
nizhikov commented on PR #13157: URL: https://github.com/apache/kafka/pull/13157#issuecomment-1402420416 I can only imagine users who excluded mirror-client dependency for whatever reason :) Don't think we should consider such case as a blocker. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #13157: KAFKA-14599 Enable javadoc for :connect:mirror module
mimaison commented on PR #13157: URL: https://github.com/apache/kafka/pull/13157#issuecomment-1402375492 The Config classes are not part of the public API. I suggested moving the interfaces without thinking too much about it, have you considered if this could break anything? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #13136: KAFKA-14582: Move JmxTool to tools
mimaison commented on code in PR #13136: URL: https://github.com/apache/kafka/pull/13136#discussion_r1085565198 ## checkstyle/import-control.xml: ## @@ -347,7 +347,7 @@ - Review Comment: We seem to have the trailing space everywhere else, so maybe keep it here too ## core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala: ## @@ -801,21 +801,15 @@ object ConsumerGroupCommand extends Logging { partitionsToReset.map { topicPartition => logStartOffsets.get(topicPartition) match { case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, new OffsetAndMetadata(offset)) -case _ => { - CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting starting offset of topic partition: $topicPartition") - Exit.exit(1) -} +case _ => ToolsUtils.printUsageAndDie(opts.parser, s"Error getting starting offset of topic partition: $topicPartition") Review Comment: Can you remove other changes not related to JmxTool? ## tools/src/main/java/org/apache/kafka/tools/JmxCommand.java: ## @@ -0,0 +1,441 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import joptsimple.OptionSpec; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; + +import javax.management.Attribute; +import javax.management.AttributeList; +import javax.management.MBeanFeatureInfo; +import javax.management.MBeanInfo; +import javax.management.MBeanServerConnection; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; +import javax.rmi.ssl.SslRMIClientSocketFactory; +import java.io.IOException; +import java.net.MalformedURLException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; +import java.util.function.BiPredicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A program for reading JMX metrics from a given endpoint. + * + * This tool only works reliably if the JmxServer is fully initialized prior to invoking the tool. + * See KAFKA-4620 for details. + */ +public class JmxCommand { +public static void main(String[] args) { +Exit.exit(mainNoExit(args)); +} + +static int mainNoExit(String... args) { +try { +execute(args); +return 0; +} catch (TerseException e) { +System.err.println(e.getMessage()); +return 1; +} catch (Throwable e) { +System.err.println(e.getMessage()); +System.err.println(Utils.stackTrace(e)); +return 1; +} +} + +static void execute(String... args) throws Exception { +JmxCommandOptions options = new JmxCommandOptions(args); +CommandLineUtils.printHelpAndExitIfNeeded(options, "Dump JMX values to standard output."); + +Optional attributesInclude = options.attributesInclude(); +Optional dateFormat = options.dateFormat(); +String reportFormat = options.parseFormat(); +boolean keepGoing = true; + +MBeanServerConnection conn = connectToBeanServer(options); +List queries = options.queries(); +boolean hasPatternQueries = queries.stream().filter(Objects::nonNull).anyMatch(ObjectName::isPattern); + +Set found = findObjectsIfNoPattern(options, conn, queries, hasPatternQueries); +Map numExpectedAttributes = +findNumExpectedAttributes(conn, attributesInclude, hasPatternQueries, queries, found); + +
[GitHub] [kafka] fvaleri commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
fvaleri commented on code in PR #13131: URL: https://github.com/apache/kafka/pull/13131#discussion_r1085675650 ## server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import joptsimple.OptionSpec; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.Exit; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; + +/** + * Helper functions for dealing with command line utilities. + */ +public class CommandLineUtils { +/** + * Check if there are no options or `--help` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ +public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) { +return commandOpts.args.length == 0 || commandOpts.options.has(commandOpts.helpOpt); +} + +/** + * Check if there is `--version` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ +public static boolean isPrintVersionNeeded(CommandDefaultOptions commandOpts) { +return commandOpts.options.has(commandOpts.versionOpt); +} + +/** + * Check and print help message if there is no options or `--help` option + * from command line, if `--version` is specified on the command line + * print version information and exit. + * NOTE: The function name is not strictly speaking correct anymore Review Comment: What about maybePrintHelpOrVersion? I think the exit part is kind of implicit when you ask for help or version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
fvaleri commented on code in PR #13131: URL: https://github.com/apache/kafka/pull/13131#discussion_r1085675650 ## server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import joptsimple.OptionSpec; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.Exit; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; + +/** + * Helper functions for dealing with command line utilities. + */ +public class CommandLineUtils { +/** + * Check if there are no options or `--help` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ +public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) { +return commandOpts.args.length == 0 || commandOpts.options.has(commandOpts.helpOpt); +} + +/** + * Check if there is `--version` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ +public static boolean isPrintVersionNeeded(CommandDefaultOptions commandOpts) { +return commandOpts.options.has(commandOpts.versionOpt); +} + +/** + * Check and print help message if there is no options or `--help` option + * from command line, if `--version` is specified on the command line + * print version information and exit. + * NOTE: The function name is not strictly speaking correct anymore Review Comment: What about maybePrintHelpOrVersion? I think the exit part is kind of implicit when you ask for help or version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
fvaleri commented on PR #13131: URL: https://github.com/apache/kafka/pull/13131#issuecomment-1402347644 > we have pushed Exit.exit(1) down into CommandLineUtils. @clolov it wasn't pushed down, that's the original logic. The code you are referring to in ZkSecurityMigrator.scala:103 is outdated. Now we are using ToolsUtils.printUsageAndDie in a few places for the reason expressed in the method comment. Please, pull latest changes and let me know. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] fvaleri commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
fvaleri commented on code in PR #13131: URL: https://github.com/apache/kafka/pull/13131#discussion_r1085675650 ## server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import joptsimple.OptionSpec; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.Exit; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; + +/** + * Helper functions for dealing with command line utilities. + */ +public class CommandLineUtils { +/** + * Check if there are no options or `--help` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ +public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) { +return commandOpts.args.length == 0 || commandOpts.options.has(commandOpts.helpOpt); +} + +/** + * Check if there is `--version` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ +public static boolean isPrintVersionNeeded(CommandDefaultOptions commandOpts) { +return commandOpts.options.has(commandOpts.versionOpt); +} + +/** + * Check and print help message if there is no options or `--help` option + * from command line, if `--version` is specified on the command line + * print version information and exit. + * NOTE: The function name is not strictly speaking correct anymore Review Comment: What about maybePrintHelpOrVersion? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13999) Add ProducerCount metrics (KIP-847)
[ https://issues.apache.org/jira/browse/KAFKA-13999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya reassigned KAFKA-13999: Assignee: Anastasia Vela > Add ProducerCount metrics (KIP-847) > --- > > Key: KAFKA-13999 > URL: https://issues.apache.org/jira/browse/KAFKA-13999 > Project: Kafka > Issue Type: Improvement >Reporter: Artem Livshits >Assignee: Anastasia Vela >Priority: Minor > Fix For: 3.5.0 > > > See > https://cwiki.apache.org/confluence/display/KAFKA/KIP-847%3A+Add+ProducerCount+metrics -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-13999) Add ProducerCount metrics (KIP-847)
[ https://issues.apache.org/jira/browse/KAFKA-13999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Divij Vaidya updated KAFKA-13999: - Fix Version/s: 3.5.0 > Add ProducerCount metrics (KIP-847) > --- > > Key: KAFKA-13999 > URL: https://issues.apache.org/jira/browse/KAFKA-13999 > Project: Kafka > Issue Type: Improvement >Reporter: Artem Livshits >Priority: Minor > Fix For: 3.5.0 > > > See > https://cwiki.apache.org/confluence/display/KAFKA/KIP-847%3A+Add+ProducerCount+metrics -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14639) Kafka CooperativeStickyAssignor revokes/assigns partition in one rebalance cycle
[ https://issues.apache.org/jira/browse/KAFKA-14639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17680275#comment-17680275 ] Bojan Blagojevic commented on KAFKA-14639: -- Thank you again for the quick response. I will try to answer to your questions. h5. Answers: ??1. Can you check and/or provide all the logs from consumer-3 between gen 639 and gen 640? Is there anything in there about resetting the generation, dropping out of the group, resetting the member id, anything at all like that??? I don't see nothing like that happening between gen 639 and gen 640. I attached the log excerpt related to few surrounding generations [^consumers-jira.log]. ??2. The only other thing off the top of my head to check would be that every single consumer was configured with (only) the CooperativeStickyAssignor over the full period from gen 639 through the end of gen 640, or at least check the group leader (consumer-5 I think?) and consumers 3 & 4.?? The full logs are unfortunately expired but I am pretty sure that all the consumers were configured with only `CooperativeStickyAssignor`. They are part of Kuberenetes deployment in which all the pods share the configuration. I observed correct group leader behaviour when changing ownership of other partitions. I followed ownership changes when partition *partition-68* is moved. It belonged to the consumer partition-2-6b9db8686f-hswvn... in generation 639. {code:java} Final assignment of partitions to consumers: partition-2-6b9db8686f-hswvn-bbcfa7e4-7d5b-4227-ad62-99e8cc6e176f=[partition-20, partition-68] ... Finished assignment for group at generation 639: partition-2-6b9db8686f-hswvn-bbcfa7e4-7d5b-4227-ad62-99e8cc6e176f=Assignment(partitions=[partition-20, partition-68]) {code} Between generation 639 and generation 640 new pod joins, pod-6b9db8686f-p87m9. One of the Kafka consumers that belongs to this pod, partition-3-6b9db8686f-p87m9..., in generation 640 gets the partition-68 as assigned and it is logged in AbstractStickyAssignor.constrainedAssign. {code:java} Final assignment of partitions to consumers: partition-3-6b9db8686f-p87m9-737d9359-daa5-4d89-9e4c-40a433aa8c6c=[partition-68]{code} Since this partition is changing ownership, it does not show up in the log of ConsumerCoordinator, which is expected. {code:java} Finished assignment for group at generation 640: partition-3-6b9db8686f-p87m9-737d9359-daa5-4d89-9e4c-40a433aa8c6c=Assignment(partitions=[]){code} And it gets assigned in generation 641: {code:java} Final assignment of partitions to consumers: partition-3-6b9db8686f-psfx4-5dd12c98-e698-44aa-9131-56281e798369=[partition-68] ... Finished assignment for group at generation 641: partition-3-6b9db8686f-psfx4-5dd12c98-e698-44aa-9131-56281e798369=Assignment(partitions=[partition-68]){code} It gets assigned to a consumer which is different than the one determined in generation 640 but this does not break the rebalance barrier. h5. Additional notes Not sure if it matters. I saw a several consumers logging: {code:java} ... org.apache.kafka.clients.Metadata ... Resetting the last seen epoch of partition partition-74 to 149 since the associated topicId changed from null to nixqUZnpQYWjY0RreaCczA" {code} I think that the HB thread(I am assuming this based on the [code I've read|https://github.com/apache/kafka/blob/3.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1193]) is requesting join on behalf of consumers. Again, not sure if it matters: {code:java} "2022-12-14 11:17:48 1 --- [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=partition-4-my-client-id-my-group-id-random_hash, groupId=my-group-id] (Re-)joining group" "2022-12-14 11:17:48 1 --- [consumer-2] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=partition-2-my-client-id-my-group-id-random_hash, groupId=my-group-id] (Re-)joining group" "2022-12-14 11:17:48 1 --- [my-group-id] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=partition-4-my-client-id-my-group-id-random_hash, groupId=my-group-id] Request joining group due to: group is already rebalancing" "2022-12-14 11:17:48 1 --- [my-group-id] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=partition-2-my-client-id-my-group-id-random_hash, groupId=my-group-id] Request joining group due to: group is already rebalancing" "2022-12-14 11:17:47 1 --- [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=partition-3-my-client-id-my-group-id-random_hash, groupId=my-group-id] (Re-)joining group" "2022-12-14 11:17:47 1 --- [consumer-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=partition-1-my-client-id-my-group-id-random_hash, groupId=my-group-id] (Re-)joining group" "2022-12-14 11:17:47 1 --- [consumer-0] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer
[jira] [Updated] (KAFKA-14639) Kafka CooperativeStickyAssignor revokes/assigns partition in one rebalance cycle
[ https://issues.apache.org/jira/browse/KAFKA-14639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bojan Blagojevic updated KAFKA-14639: - Attachment: consumers-jira.log > Kafka CooperativeStickyAssignor revokes/assigns partition in one rebalance > cycle > > > Key: KAFKA-14639 > URL: https://issues.apache.org/jira/browse/KAFKA-14639 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.2.1 >Reporter: Bojan Blagojevic >Priority: Major > Attachments: consumers-jira.log > > > I have an application that runs 6 consumers in parallel. I am getting some > unexpected results when I use {{{}CooperativeStickyAssignor{}}}. If I > understand the mechanism correctly, if the consumer looses partition in one > rebalance cycle, the partition should be assigned in the next rebalance cycle. > This assumption is based on the > [RebalanceProtocol|https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.RebalanceProtocol.html] > documentation and few blog posts that describe the protocol, like [this > one|https://www.confluent.io/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/] > on Confluent blog. > {quote}The assignor should not reassign any owned partitions immediately, but > instead may indicate consumers the need for partition revocation so that the > revoked partitions can be reassigned to other consumers in the next rebalance > event. This is designed for sticky assignment logic which attempts to > minimize partition reassignment with cooperative adjustments. > {quote} > {quote}Any member that revoked partitions then rejoins the group, triggering > a second rebalance so that its revoked partitions can be assigned. Until > then, these partitions are unowned and unassigned. > {quote} > These are the logs from the application that uses > {{{}protocol='cooperative-sticky'{}}}. In the same rebalance cycle > ({{{}generationId=640{}}}) {{partition 74}} moves from {{consumer-3}} to > {{{}consumer-4{}}}. I omitted the lines that are logged by the other 4 > consumers. > Mind that the log is in reverse(bottom to top) > {code:java} > 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : New > partition assignment: partition-59, seek to min common offset: 85120524 > 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler2 : Partitions > [partition-59] assigned successfully > 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Partitions > assigned: [partition-59] > 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer clientId=partition-3-my-client-id-my-group-id, > groupId=my-group-id] Adding newly assigned partitions: partition-59 > 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer clientId=partition-3-my-client-id-my-group-id, > groupId=my-group-id] Notifying assignor about the new > Assignment(partitions=[partition-59]) > 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer clientId=partition-3-my-client-id-my-group-id, > groupId=my-group-id] Request joining group due to: need to revoke partitions > [partition-26, partition-74] as indicated by the current assignment and > re-join > 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler2 : Partitions > [partition-26, partition-74] revoked successfully > 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Finished > removing partition data > 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer clientId=partition-4-my-client-id-my-group-id, > groupId=my-group-id] (Re-)joining group > 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : New > partition assignment: partition-74, seek to min common offset: 107317730 > 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler2 : Partitions > [partition-74] assigned successfully > 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : Partitions > assigned: [partition-74] > 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer clientId=partition-4-my-client-id-my-group-id, > groupId=my-group-id] Adding newly assigned partitions: partition-74 > 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer clientId=partition-4-my-client-id-my-group-id, > groupId=my-group-id] Notifying assignor about the new > Assignment(partitions=[partition-74]) > 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer clientId=partition-4-my-client-id-my-group-id, > groupId=my-group-id] Request joining group due to: need to revoke partitions >
[GitHub] [kafka] viktorsomogyi commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables
viktorsomogyi commented on code in PR #12992: URL: https://github.com/apache/kafka/pull/12992#discussion_r1085559673 ## clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProvider.java: ## @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.config.provider; + +import org.apache.kafka.common.config.ConfigData; +import org.apache.kafka.common.config.ConfigException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +public class EnvVarConfigProvider implements ConfigProvider { +private final Map envVarMap; + +public EnvVarConfigProvider() { +envVarMap = getEnvVars(); +} + +public EnvVarConfigProvider(Map envVarsAsArgument) { +envVarMap = envVarsAsArgument; +} + +private static final Logger log = LoggerFactory.getLogger(EnvVarConfigProvider.class); + +@Override +public void configure(Map configs) { +} + +@Override +public void close() throws IOException { +} + +/** + * @param s unused + * @return returns environment variables as configuration + */ +@Override +public ConfigData get(String s) { +return get(s, null); +} + +/** + * @param sunused + * @param keys the keys whose values will be retrieved. + * @return the configuration data. + */ +@Override +public ConfigData get(String s, Set keys) { Review Comment: It may be a bit more defensive if we validate that the path is given is either empty or null or else we don't support it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nizhikov commented on pull request #13157: KAFKA-14599 Enable javadoc for :connect:mirror module
nizhikov commented on PR #13157: URL: https://github.com/apache/kafka/pull/13157#issuecomment-1402195231 @mimaison Thanks for the review. What about *Config classes? like `MirrorCheckpointConfig` or `MirrorConnectorConfig`. Are the part of public API? Should I move them to mirror-client? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #13157: KAFKA-14599 Enable javadoc for :connect:mirror module
mimaison commented on PR #13157: URL: https://github.com/apache/kafka/pull/13157#issuecomment-1402188145 @nizhikov Thanks for looking at this issue. I don't think we can simply enable the javadoc on the `connect:mirror` project as it will also include all public classes from that module that are not part of the public API. We only want to include the interfaces users can implement. We should be able to use a filter to select the classes to include. Otherwise I wonder if we could move the classes we want to the mirror-client module. The package should be the same, and as mirror depends on it, it should hopefully not break anything. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes
mimaison commented on code in PR #13131: URL: https://github.com/apache/kafka/pull/13131#discussion_r1085116578 ## server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import joptsimple.OptionSpec; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.Exit; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; + +/** + * Helper functions for dealing with command line utilities. + */ +public class CommandLineUtils { +/** + * Check if there are no options or `--help` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ +public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) { +return commandOpts.args.length == 0 || commandOpts.options.has(commandOpts.helpOpt); +} + +/** + * Check if there is `--version` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ +public static boolean isPrintVersionNeeded(CommandDefaultOptions commandOpts) { +return commandOpts.options.has(commandOpts.versionOpt); +} + +/** + * Check and print help message if there is no options or `--help` option + * from command line, if `--version` is specified on the command line + * print version information and exit. + * NOTE: The function name is not strictly speaking correct anymore Review Comment: Should we take this opportunity to update this name? ## server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java: ## @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.util; + +import joptsimple.OptionParser; +import joptsimple.OptionSet; +import joptsimple.OptionSpec; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.Exit; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; + +/** + * Helper functions for dealing with command line utilities. + */ +public class CommandLineUtils { +/** + * Check if there are no options or `--help` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ +public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) { +return commandOpts.args.length == 0 || commandOpts.options.has(commandOpts.helpOpt); +} + +/** + * Check if there is `--version` option from command line. + * + * @param commandOpts Acceptable options for a command + * @return true on matching the help check condition + */ +public static boolean isPrintVersionNeeded(CommandDefaultOptions commandOpts) { +return commandOpts.options.has(commandOpts.versionOpt); +} + +/** + * Check and print help message if there is no options or `--help` option + *
[GitHub] [kafka] MPeli commented on pull request #6329: KAFKA-1194: Fix renaming open files on Windows
MPeli commented on PR #6329: URL: https://github.com/apache/kafka/pull/6329#issuecomment-1402091415 Hi, I have created a new pull request. See #12331 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14565) Fix Interceptor Resource Leakage
[ https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Terry Beard updated KAFKA-14565: Summary: Fix Interceptor Resource Leakage (was: Improve Interceptor Resource Leakage Prevention) > Fix Interceptor Resource Leakage > > > Key: KAFKA-14565 > URL: https://issues.apache.org/jira/browse/KAFKA-14565 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Terry Beard >Assignee: Terry Beard >Priority: Major > Fix For: 3.5.0 > > > The Consumer and Producer interceptor interfaces and their corresponding > Kafka Consumer and Producer constructors do not adequately support cleanup of > underlying interceptor resources. > Currently within the Kafka Consumer and Kafka Producer constructors, the > *AbstractConfig.getConfiguredInstances()* is delegated responsibility for > both creating and configuring each interceptor listed in the > interceptor.classes property and returns a configured > *List>* interceptors. > This dual responsibility for both creation and configuration is problematic > when it involves multiple interceptors where at least one interceptor's > configure method implementation creates and/or depends on objects which > creates threads, connections or other resources which requires clean up and > the subsequent interceptor's configure method raises a runtime exception. > This raising of the runtime exception produces a resource leakage in the > first interceptor as the interceptor container i.e. > ConsumerInterceptors/ProducerInterceptors is never created and therefore the > first interceptor's and really any interceptor's close method are never > called. > To help ensure the respective container interceptors are able to invoke their > respective interceptor close methods for proper resource clean up, I propose > two approaches: > +*PROPOSAL 1*+ > Define a default *open* or *configureWithResources()* or *acquireResources()* > method with no implementation and check exception on the respective > Consumer/Producer interceptor interfaces. This method as a part the > interceptor life cycle management will be responsible for creating threads > and/or objects which utilizes threads, connections or other resource which > requires clean up. Additionally, this default method enables implementation > optionality as it's empty default behavior means it will do nothing when > unimplemented mitigating backwards compatibility impact to exiting > interceptors. Finally, the Kafka Consumer/Producer Interceptor containers > will implement a corresponding *maybeOpen* or *maybeConfigureWithResources* > or *maybeAcquireResources* method which also throws a checked exception. > See below code excerpt for the Consumer/Producer constructor: > {code:java} > List> interceptorList = (List) > config.getConfiguredInstances( > ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, > ConsumerInterceptor.class, > Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)); > this.interceptors = new ConsumerInterceptors<>(interceptorList); > this.interceptors.maybeConfigureWithResources(); > {code} > +*PROPOSAL 2*+ > To avoid changing any public interfaces and the subsequent KIP process, we can > * Create a class which inherits or wraps AbstractConfig that contains a new > method which will return a ConfiguredInstanceResult class. This > ConfiguredInstanceResult class will contain an optional list of successfully > created interceptors and/or exception which occurred while calling each > Interceptor::configure. Additionally, it will contain a helper method to > rethrow an exception as well as a method which returns the underlying > exception. The caller is expected to handle the exception and perform clean > up e.g. call Interceptor::close on each interceptor in the list provided by > the ConfiguredInstanceResult class. > * Automatically invoke {{close}} on any {{Closeable}} or {{AutoCloseable}} > instances if/when a failure occurs > * Add a new overloaded {{getConfiguredInstance}} / > {{getConfiguredInstances}} variant that allows users to specify whether > already-instantiated classes should be closed or not when a failure occurs > * Add a new exception type to the public API that includes a list of all of > the successfully-instantiated (and/or successfully-configured) instances > before the error was encountered so that callers can choose how to handle the > failure however they want (and possibly so that instantiation/configuration > can be attempted on every class before throwing the exception) > > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14487) Move LogManager to storage module
[ https://issues.apache.org/jira/browse/KAFKA-14487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17680230#comment-17680230 ] Sagar Rao commented on KAFKA-14487: --- [~ijuma] , is it ok if I can pick this one up? Seems like it's not blocked by any other activities that you all are working on? Plz let me know. Thanks > Move LogManager to storage module > - > > Key: KAFKA-14487 > URL: https://issues.apache.org/jira/browse/KAFKA-14487 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] vamossagar12 commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance
vamossagar12 commented on PR #12561: URL: https://github.com/apache/kafka/pull/12561#issuecomment-1402008138 @C0urante , thanks for your response. Makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ruslankrivoshein commented on pull request #13074: MINOR: upgrade.from is revealed for Upgrade doc
ruslankrivoshein commented on PR #13074: URL: https://github.com/apache/kafka/pull/13074#issuecomment-1401965227 @hachikuji, what do you think about this tiny edition? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #13111: KAFKA-14190: Update Zk TopicId from locally stored cache in controller
divijvaidya commented on PR #13111: URL: https://github.com/apache/kafka/pull/13111#issuecomment-1401824565 @dajac > Will this code still be around by the time tiered storage is completed? I don't know but my point is that this code change is simple and safe enough to add it to the current code as of today. @jolshan > My other concern here is that even though this fixes the issue in the case where the controller stays the same, it doesn't cover controller re-election. This means we would still have to share and support the recovery methods. If this is a big issue for tiered storage, then we could still be in trouble. To be very precise here, this fix won't work, if the controller context does not have the old topic Id. It will only happen when controller failover took place exactly between the duration when admin overwrote Zk and controller. Note that controller failover during all other time will work fine (since controller will recreate controller context from Zk which would have been updated with oldTopicId earlier). And yes, I agree this is not a 100% fix but it's a start. Since, it's a safe fix and doesn't have side effects, we should push it out. > Also curious if we can upload a segment with the wrong ID if the leader and ISR request is blocked (and thus can't become a leader or follower) Great question! The topic Id mismatch check [during handling of LISR request](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1495) is based on matching the local topic Id in the broker with the one that is sent with LISR. However, it's very much possible to not have any topicId locally. As an example, let's say the partition reassignment leads to partition placement on a broker where log hasn't been created so far. In such cases, LISR won't throw a topic mismatch error and it won't be blocked. Instead it will start operating with new topic Id. Now, we will have some followers working with old topic Id (where LISR was blocked) and some with new topic Id. If a failover happens to the one with new topic Id, it will start uploading segments to tiered storage with new topic Id and thus, for the same topic partition, we will have segments with old topic Id as well as new topic Id. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on pull request #13158: KAFKA-14647: Moving TopicFilter to server-common/utils
vamossagar12 commented on PR #13158: URL: https://github.com/apache/kafka/pull/13158#issuecomment-1401815286 @fvaleri , plz review this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 opened a new pull request, #13158: KAFKA-14647: Moving TopicFilter to server-common/utils
vamossagar12 opened a new pull request, #13158: URL: https://github.com/apache/kafka/pull/13158 Moving TopicFilter to server-common/utils -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12331: KAFKA-1194: changes needed to run on Windows
divijvaidya commented on PR #12331: URL: https://github.com/apache/kafka/pull/12331#issuecomment-1401813587 @sjetha you should probably send an email to the [d...@kafka.apache.org](mailto:d...@kafka.apache.org) mailing list, explaining the urgency and asking for someone to take a look at this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14646) SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 3.3.2)
[ https://issues.apache.org/jira/browse/KAFKA-14646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17680181#comment-17680181 ] Jochen Schalanda commented on KAFKA-14646: -- Could it still be that the check in [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java#L94-L99] is too restrictive and should read *{{record.value().getVersion() > SubscriptionWrapper.CURRENT_VERSION}}* instead? > SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> > 3.3.2) > > > Key: KAFKA-14646 > URL: https://issues.apache.org/jira/browse/KAFKA-14646 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.2 > Environment: Kafka Streams 3.2.3 (before update) > Kafka Streams 3.3.2 (after update) > Java 17 (Eclipse Temurin 17.0.5), Linux x86_64 >Reporter: Jochen Schalanda >Priority: Major > > Hey folks, > > we've just updated an application from *_Kafka Streams 3.2.3 to 3.3.2_* and > started getting the following exceptions: > {code:java} > org.apache.kafka.common.errors.UnsupportedVersionException: > SubscriptionWrapper is of an incompatible version. {code} > After swiftly looking through the code, this exception is potentially thrown > in two places: > * > [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java#L73-L78] > ** Here the check was changed in Kafka 3.3.x: > [https://github.com/apache/kafka/commit/9dd25ecd9ce17e608c6aba98e0422b26ed133c12] > * > [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java#L94-L99] > ** Here the check wasn't changed. > > Is it possible that the second check in > {{SubscriptionStoreReceiveProcessorSupplier}} was forgotten? > > Any hints how to resolve this issue without a downgrade? > Since this only affects 2 of 15 topologies in the application, I'm hesitant > to just downgrade to Kafka 3.2.3 again since the internal topics might > already have been updated to use the "new" version of > {{{}SubscriptionWrapper{}}}. > > Related discussion in the Confluent Community Slack: > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1674497054507119] > h2. Stack trace > {code:java} > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=1_8, > processor=XXX-joined-changed-fk-subscription-registration-source, > topic=topic.rev7-XXX-joined-changed-fk-subscription-registration-topic, > partition=8, offset=12297976, > stacktrace=org.apache.kafka.common.errors.UnsupportedVersionException: > SubscriptionWrapper is of an incompatible version. > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:750) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:770) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] nizhikov opened a new pull request, #13157: KAFKA-14599 Enable javadoc for :connect:mirror module
nizhikov opened a new pull request, #13157: URL: https://github.com/apache/kafka/pull/13157 Currently, javadoc task disabled for `:connect:mirror` module. This PR enables it. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic
yashmayya commented on code in PR #12984: URL: https://github.com/apache/kafka/pull/12984#discussion_r1084960389 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -712,8 +733,16 @@ KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final Wo } private void sendPrivileged(String key, byte[] value) { +sendPrivileged(key, value, null); +} + +private void sendPrivileged(String key, byte[] value, Callback callback) { if (!usesFencableWriter) { -configLog.send(key, value); Review Comment: Yeah, that's right, the API response will only include the generic top level message. > I think it'd be nice to include more detail on the cause of the failure I strongly agree, and this was discussed in some more detail on the [other thread](https://github.com/apache/kafka/pull/12984#discussion_r1064077119). > We wouldn't be making it more vague. The message would state that the write to the config topic failed which is the cause for failure. Since the exception mapper used by Connect's REST server only writes the [top level exception's message](https://github.com/apache/kafka/blob/d798ec779c25dba31fa5ee9384d159ed54c6e07b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java#L72) to the response (i.e. nested exceptions aren't surfaced via the REST API response), I think it makes sense to keep the top level exception's message generic and allow users to debug further via the worker logs (where the entire exception chain's stack trace will be visible). ... The reasoning here is that since a Connect user may not even know that Connect uses a producer under the hood to write certain requests to the config topic for asynchronous processing, it would make more sense to have an informative Connect specific exception message rather than directly throwing the producer exception which may or may not contain enough details to be relevant to a Connect user. > Another option for the above issue could be changing the exception mapper to concatenate all the exception messages from the exception chain. > Yet another option for this could be to simply append a "Check the worker logs for more details on the error" to the top level exception's message in the REST API response (the worker logs will have the entire exception chain). Thoughts? What do you think about modifying the exception mapper to be more informative (either in this PR or a separate one)? ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -711,9 +742,9 @@ KafkaBasedLog setupAndCreateKafkaBasedLog(String topic, final Wo return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier); } -private void sendPrivileged(String key, byte[] value) { +private void sendPrivileged(String key, byte[] value) throws ExecutionException, InterruptedException { if (!usesFencableWriter) { -configLog.send(key, value); +configLog.send(key, value).get(); Review Comment: Thanks Chris, both great points. The `get` without timeout here was definitely a miss on my part. I've addressed both of your raised concerns in the latest patch (including batching multiple sends in a single transaction for the EOS enabled case). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-14646) SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 3.3.2)
[ https://issues.apache.org/jira/browse/KAFKA-14646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17680143#comment-17680143 ] Jochen Schalanda edited comment on KAFKA-14646 at 1/24/23 8:42 AM: --- Unfortunately the two rolling updates (with {{upgrade.from="3.2"}} and then removing the setting again) didn't help. We still see the same exception: {code:java} org.apache.kafka.common.errors.UnsupportedVersionException: SubscriptionWrapper is of an incompatible version. {code} [~mjsax] Do you have any hints how to resolve this issue? We see it in only 2 topologies out of 15 and I'm afraid that downgrading to Kafka Streams 3.2.3 will break something else now. was (Author: joschi): Unfortunately the two rolling updates (with {{upgrade.from="3.2"}} and then removing the setting again) didn't help. We still see the same exception: {code:java} org.apache.kafka.common.errors.UnsupportedVersionException: SubscriptionWrapper is of an incompatible version. {code} [~mjsax] Do you have any hints how to resolve this issue? We see it in only 2 topologies out of 15 and I'm afraid that downgrading to Kafka Streams 3.2.3 will break something else now. > SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> > 3.3.2) > > > Key: KAFKA-14646 > URL: https://issues.apache.org/jira/browse/KAFKA-14646 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.2 > Environment: Kafka Streams 3.2.3 (before update) > Kafka Streams 3.3.2 (after update) > Java 17 (Eclipse Temurin 17.0.5), Linux x86_64 >Reporter: Jochen Schalanda >Priority: Major > > Hey folks, > > we've just updated an application from *_Kafka Streams 3.2.3 to 3.3.2_* and > started getting the following exceptions: > {code:java} > org.apache.kafka.common.errors.UnsupportedVersionException: > SubscriptionWrapper is of an incompatible version. {code} > After swiftly looking through the code, this exception is potentially thrown > in two places: > * > [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java#L73-L78] > ** Here the check was changed in Kafka 3.3.x: > [https://github.com/apache/kafka/commit/9dd25ecd9ce17e608c6aba98e0422b26ed133c12] > * > [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java#L94-L99] > ** Here the check wasn't changed. > > Is it possible that the second check in > {{SubscriptionStoreReceiveProcessorSupplier}} was forgotten? > > Any hints how to resolve this issue without a downgrade? > Since this only affects 2 of 15 topologies in the application, I'm hesitant > to just downgrade to Kafka 3.2.3 again since the internal topics might > already have been updated to use the "new" version of > {{{}SubscriptionWrapper{}}}. > > Related discussion in the Confluent Community Slack: > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1674497054507119] > h2. Stack trace > {code:java} > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=1_8, > processor=XXX-joined-changed-fk-subscription-registration-source, > topic=topic.rev7-XXX-joined-changed-fk-subscription-registration-topic, > partition=8, offset=12297976, > stacktrace=org.apache.kafka.common.errors.UnsupportedVersionException: > SubscriptionWrapper is of an incompatible version. > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:750) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:770) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14646) SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 3.3.2)
[ https://issues.apache.org/jira/browse/KAFKA-14646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17680143#comment-17680143 ] Jochen Schalanda commented on KAFKA-14646: -- Unfortunately the two rolling updates (with {{upgrade.from="3.2"}} and then removing the setting again) didn't help. We still see the same exception: {code:java} org.apache.kafka.common.errors.UnsupportedVersionException: SubscriptionWrapper is of an incompatible version. {code} [~mjsax] Do you have any hints how to resolve this issue? We see it in only 2 topologies out of 15 and I'm afraid that downgrading to Kafka Streams 3.2.3 will break something else now. > SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> > 3.3.2) > > > Key: KAFKA-14646 > URL: https://issues.apache.org/jira/browse/KAFKA-14646 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.3.2 > Environment: Kafka Streams 3.2.3 (before update) > Kafka Streams 3.3.2 (after update) > Java 17 (Eclipse Temurin 17.0.5), Linux x86_64 >Reporter: Jochen Schalanda >Priority: Major > > Hey folks, > > we've just updated an application from *_Kafka Streams 3.2.3 to 3.3.2_* and > started getting the following exceptions: > {code:java} > org.apache.kafka.common.errors.UnsupportedVersionException: > SubscriptionWrapper is of an incompatible version. {code} > After swiftly looking through the code, this exception is potentially thrown > in two places: > * > [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java#L73-L78] > ** Here the check was changed in Kafka 3.3.x: > [https://github.com/apache/kafka/commit/9dd25ecd9ce17e608c6aba98e0422b26ed133c12] > * > [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java#L94-L99] > ** Here the check wasn't changed. > > Is it possible that the second check in > {{SubscriptionStoreReceiveProcessorSupplier}} was forgotten? > > Any hints how to resolve this issue without a downgrade? > Since this only affects 2 of 15 topologies in the application, I'm hesitant > to just downgrade to Kafka 3.2.3 again since the internal topics might > already have been updated to use the "new" version of > {{{}SubscriptionWrapper{}}}. > > Related discussion in the Confluent Community Slack: > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1674497054507119] > h2. Stack trace > {code:java} > org.apache.kafka.streams.errors.StreamsException: Exception caught in > process. taskId=1_8, > processor=XXX-joined-changed-fk-subscription-registration-source, > topic=topic.rev7-XXX-joined-changed-fk-subscription-registration-topic, > partition=8, offset=12297976, > stacktrace=org.apache.kafka.common.errors.UnsupportedVersionException: > SubscriptionWrapper is of an incompatible version. > at > org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:750) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100) > at > org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81) > at > org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:770) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14646) SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 3.3.2)
[ https://issues.apache.org/jira/browse/KAFKA-14646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jochen Schalanda updated KAFKA-14646: - Description: Hey folks, we've just updated an application from *_Kafka Streams 3.2.3 to 3.3.2_* and started getting the following exceptions: {code:java} org.apache.kafka.common.errors.UnsupportedVersionException: SubscriptionWrapper is of an incompatible version. {code} After swiftly looking through the code, this exception is potentially thrown in two places: * [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java#L73-L78] ** Here the check was changed in Kafka 3.3.x: [https://github.com/apache/kafka/commit/9dd25ecd9ce17e608c6aba98e0422b26ed133c12] * [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java#L94-L99] ** Here the check wasn't changed. Is it possible that the second check in {{SubscriptionStoreReceiveProcessorSupplier}} was forgotten? Any hints how to resolve this issue without a downgrade? Since this only affects 2 of 15 topologies in the application, I'm hesitant to just downgrade to Kafka 3.2.3 again since the internal topics might already have been updated to use the "new" version of {{{}SubscriptionWrapper{}}}. Related discussion in the Confluent Community Slack: [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1674497054507119] h2. Stack trace {code:java} org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=1_8, processor=XXX-joined-changed-fk-subscription-registration-source, topic=topic.rev7-XXX-joined-changed-fk-subscription-registration-topic, partition=8, offset=12297976, stacktrace=org.apache.kafka.common.errors.UnsupportedVersionException: SubscriptionWrapper is of an incompatible version. at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:750) at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100) at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:770) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550) {code} was: Hey folks, we've just updated an application from *_Kafka Streams 3.2.3 to 3.3.2_* and started getting the following exceptions: {code:java} org.apache.kafka.common.errors.UnsupportedVersionException: SubscriptionWrapper is of an incompatible version. {code} After swiftly looking through the code, this exception is potentially thrown in two places: * [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java#L73-L78] ** Here the check was changed in Kafka 3.3.x: [https://github.com/apache/kafka/commit/9dd25ecd9ce17e608c6aba98e0422b26ed133c12] * [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java#L94-L99] ** Here the check wasn't changed. Is it possible that the second check in {{SubscriptionStoreReceiveProcessorSupplier}} was forgotten? Any hints how to resolve this issue without a downgrade? Since this only affects 2 of 20+ topologies in the application, I'm hesitant to just downgrade to Kafka 3.2.3 again since the internal topics might already have been updated to use the "new" version of {{{}SubscriptionWrapper{}}}. Related discussion in the Confluent Community Slack: [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1674497054507119] h2. Stack trace {code:java} org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=1_8, processor=XXX-joined-changed-fk-subscription-registration-source, topic=topic.rev7-XXX-joined-changed-fk-subscription-registration-topic, partition=8, offset=12297976, stacktrace=org.apache.kafka.common.errors.UnsupportedVersionException: SubscriptionWrapper is of an incompatible version. at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:750) at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100) at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81) at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182) at
[jira] [Commented] (KAFKA-14639) Kafka CooperativeStickyAssignor revokes/assigns partition in one rebalance cycle
[ https://issues.apache.org/jira/browse/KAFKA-14639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17680141#comment-17680141 ] A. Sophie Blee-Goldman commented on KAFKA-14639: Thanks for the additional logs, that does indeed verify that both consumers participated in the same rebalance. My next guess would be that for some reason, consumer-3 had "lost" its partitions prior to joining the group in gen 640, in which case they would be allowed to be freely given away to another consumer in that same generation. Can you check and/or provide all the logs from consumer-3 between gen 639 and gen 640? Is there anything in there about resetting the generation, dropping out of the group, resetting the member id, anything at all like that? I also notice that the assignment changes drastically between gen 639 and 640, it's not "sticky" at all which should have been easy for the assignor to do if the previous assignment was something relatively simple like each consumer claiming exactly one or two partitions (only) and all from the same topic. Something fishy is definitely going on. The only other thing off the top of my head to check would be that every single consumer was configured with (only) the CooperativeStickyAssignor over the full period from gen 639 through the end of gen 640, or at least check the group leader (consumer-5 I think?) and consumers 3 & 4. I'll take a look at the assignor logic and see if anything jumps out at me on my end, but I have to say the complete lack of stickiness in the assignment from 639 to 640 is fairly perplexing and something I have never seen before with the CooperativeStickyAssignor. There have been some recents bugs related to rebalancing edge cases that have been fixed over the past few versions, so I may go back over those and see if anything was messed up by them. I did in fact discover one bug affecting rebalancing/assignment in the past few months which had been introduced by that series of fixes, so perhaps there is another. > Kafka CooperativeStickyAssignor revokes/assigns partition in one rebalance > cycle > > > Key: KAFKA-14639 > URL: https://issues.apache.org/jira/browse/KAFKA-14639 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.2.1 >Reporter: Bojan Blagojevic >Priority: Major > > I have an application that runs 6 consumers in parallel. I am getting some > unexpected results when I use {{{}CooperativeStickyAssignor{}}}. If I > understand the mechanism correctly, if the consumer looses partition in one > rebalance cycle, the partition should be assigned in the next rebalance cycle. > This assumption is based on the > [RebalanceProtocol|https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.RebalanceProtocol.html] > documentation and few blog posts that describe the protocol, like [this > one|https://www.confluent.io/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/] > on Confluent blog. > {quote}The assignor should not reassign any owned partitions immediately, but > instead may indicate consumers the need for partition revocation so that the > revoked partitions can be reassigned to other consumers in the next rebalance > event. This is designed for sticky assignment logic which attempts to > minimize partition reassignment with cooperative adjustments. > {quote} > {quote}Any member that revoked partitions then rejoins the group, triggering > a second rebalance so that its revoked partitions can be assigned. Until > then, these partitions are unowned and unassigned. > {quote} > These are the logs from the application that uses > {{{}protocol='cooperative-sticky'{}}}. In the same rebalance cycle > ({{{}generationId=640{}}}) {{partition 74}} moves from {{consumer-3}} to > {{{}consumer-4{}}}. I omitted the lines that are logged by the other 4 > consumers. > Mind that the log is in reverse(bottom to top) > {code:java} > 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : New > partition assignment: partition-59, seek to min common offset: 85120524 > 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler2 : Partitions > [partition-59] assigned successfully > 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Partitions > assigned: [partition-59] > 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer clientId=partition-3-my-client-id-my-group-id, > groupId=my-group-id] Adding newly assigned partitions: partition-59 > 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator > : [Consumer clientId=partition-3-my-client-id-my-group-id, > groupId=my-group-id] Notifying assignor about the new >
[GitHub] [kafka] fvaleri commented on pull request #13136: KAFKA-14582: Move JmxTool to tools
fvaleri commented on PR #13136: URL: https://github.com/apache/kafka/pull/13136#issuecomment-1401539053 @mimaison @clolov @vamossagar12 this is ready for review if you have some time. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #13156: KAFKA-14533: re-enable 'false' and disable the 'true' parameter of SmokeTestDriverIntegrationTest
ableegoldman commented on PR #13156: URL: https://github.com/apache/kafka/pull/13156#issuecomment-1401535688 Merged to trunk and cherrypicked back to 3.4 since this is trivial and rather time-sensitive -- hopefully we can finally narrow down the culprit for good and have a better shot at getting a full clean build of 3.4 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14533) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
[ https://issues.apache.org/jira/browse/KAFKA-14533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17680134#comment-17680134 ] A. Sophie Blee-Goldman commented on KAFKA-14533: FYI after disabling the `false` parameter I immediately saw another failure, which points to the state updater as the culprit after all. I did one final PR to verify this by disabling the `true` build and enabling the `false` build again – https://github.com/apache/kafka/pull/13156 > Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance > - > > Key: KAFKA-14533 > URL: https://issues.apache.org/jira/browse/KAFKA-14533 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: Greg Harris >Priority: Major > Labels: flaky-test > > The SmokeTestDriverIntegrationTest appears to be flakey failing in recent > runs: > ``` > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1444/tests/ > java.util.concurrent.TimeoutException: > shouldWorkWithRebalance(boolean) timed out after 600 seconds > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1443/tests/ > java.util.concurrent.TimeoutException: > shouldWorkWithRebalance(boolean) timed out after 600 seconds > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1441/tests/ > java.util.concurrent.TimeoutException: > shouldWorkWithRebalance(boolean) timed out after 600 seconds > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1440/tests/ > java.util.concurrent.TimeoutException: > shouldWorkWithRebalance(boolean) timed out after 600 seconds > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1438/tests/ > java.util.concurrent.TimeoutException: > shouldWorkWithRebalance(boolean) timed out after 600 seconds > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1434/tests/ > java.util.concurrent.TimeoutException: > shouldWorkWithRebalance(boolean) timed out after 600 seconds > ``` > The stacktrace appears to be: > ``` > java.util.concurrent.TimeoutException: shouldWorkWithRebalance(boolean) timed > out after 600 seconds > at > org.junit.jupiter.engine.extension.TimeoutExceptionFactory.create(TimeoutExceptionFactory.java:29) > at > org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:58) > at > org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > ... > Suppressed: java.lang.InterruptedException: sleep interrupted > at java.lang.Thread.sleep(Native Method) > at > org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.shouldWorkWithRebalance(SmokeTestDriverIntegrationTest.java:151) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727) > at > org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45) > ... 134 more > ``` > The test appears to be timing out waiting for the SmokeTestClient to complete > its asynchronous close, and taking significantly longer to do so (600s > instead of 60s) than a typical local test execution time. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ableegoldman merged pull request #13156: KAFKA-14533: re-enable 'false' and disable the 'true' parameter of SmokeTestDriverIntegrationTest
ableegoldman merged PR #13156: URL: https://github.com/apache/kafka/pull/13156 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #13156: KAFKA-14533: re-enable 'false' and disable the 'true' parameter of SmokeTestDriverIntegrationTest
ableegoldman commented on PR #13156: URL: https://github.com/apache/kafka/pull/13156#issuecomment-1401529084 cc @lucasbru @mjsax -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request, #13156: KAFKA-14533: re-enable 'false' and disable the 'true' parameter of SmokeTestDriverIntegrationTest
ableegoldman opened a new pull request, #13156: URL: https://github.com/apache/kafka/pull/13156 I immediately saw a failure with `stateUpdaterEnabled = true` after disabling the `false` parameter, which suggests the problem actually does lie in the state updater itself and not the act of parametrization of the test. To verify this theory, and help stabilize the 3.4 release branch, let's try one more test by swapping out the `true` build in favor of the `false` one. If the `listOffsets` requests stop failing and causing this integration test to hit the global timeout as is currently happening at such a high rate, then we have pretty good evidence pointing at the state updater and should be able to debug things more easily from there. After getting in a few builds to see whether the flakiness subsides, we should merge this PR to re-enable both parameters going forward: https://github.com/apache/kafka/pull/13155 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org