[GitHub] [kafka] Playerharperb commented on pull request #13162: fix: replace an inefficient loop in kafka internals

2023-01-24 Thread via GitHub


Playerharperb commented on PR #13162:
URL: https://github.com/apache/kafka/pull/13162#issuecomment-1403125466

   Hello my name is playerharp...@gmail.com and I am interested in this job 
posting on the website 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14651) Add connectorDeleted flag when stopping tasks

2023-01-24 Thread Hector Geraldino (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14651?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hector Geraldino updated KAFKA-14651:
-
Description: 
Jira ticket for 
[KIP-901|https://cwiki.apache.org/confluence/display/KAFKA/KIP-901%3A+Add+connectorDeleted+flag+when+stopping+tasks]

It would be useful for Connectors to know when its instance is being deleted. 
This will give a chance to connector tasks to perform any cleanup routines 
before as part of the connector removal process.

  was:It would be useful for Connectors to know when its instance is being 
deleted. This will give a chance to connectors to perform any cleanup tasks 
(e.g. deleting external resources, or deleting offsets) before the connector is 
completely removed from the cluster.


> Add connectorDeleted flag when stopping tasks
> -
>
> Key: KAFKA-14651
> URL: https://issues.apache.org/jira/browse/KAFKA-14651
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Hector Geraldino
>Assignee: Hector Geraldino
>Priority: Minor
>
> Jira ticket for 
> [KIP-901|https://cwiki.apache.org/confluence/display/KAFKA/KIP-901%3A+Add+connectorDeleted+flag+when+stopping+tasks]
> It would be useful for Connectors to know when its instance is being deleted. 
> This will give a chance to connector tasks to perform any cleanup routines 
> before as part of the connector removal process.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14651) Add connectorDeleted flag when stopping tasks

2023-01-24 Thread Hector Geraldino (Jira)
Hector Geraldino created KAFKA-14651:


 Summary: Add connectorDeleted flag when stopping tasks
 Key: KAFKA-14651
 URL: https://issues.apache.org/jira/browse/KAFKA-14651
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Hector Geraldino
Assignee: Hector Geraldino


It would be useful for Connectors to know when its instance is being deleted. 
This will give a chance to connectors to perform any cleanup tasks (e.g. 
deleting external resources, or deleting offsets) before the connector is 
completely removed from the cluster.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] rayokota commented on pull request #12126: KAFKA-8713 KIP-581: Add new conf serialize.accept.optional.null in connect-json

2023-01-24 Thread via GitHub


rayokota commented on PR #12126:
URL: https://github.com/apache/kafka/pull/12126#issuecomment-1403014501

   @blueberrysugarhigh , sorry I'm not the author of this PR and I haven't 
tried it myself.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] blueberrysugarhigh commented on pull request #12126: KAFKA-8713 KIP-581: Add new conf serialize.accept.optional.null in connect-json

2023-01-24 Thread via GitHub


blueberrysugarhigh commented on PR #12126:
URL: https://github.com/apache/kafka/pull/12126#issuecomment-1402996186

   > @rayokota sorry for bothering you. I'm trying to maintain my own version 
of `JsonConverter`. I just copied the code and pasted it to a new class 
`JsonConverter`. and I replaced the following line
   > 
   > 
https://github.com/a0x8o/kafka/blob/283d07f9d2577e78b7aab7a419818cf9cda19d0a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java#L664
   > 
   > to
   > 
   > ```java
   > obj.set(field.name(), this.convertToJson(field.schema(), 
struct.getWithoutDefault(field.name(;
   > ```
   > 
   > Then I changed my debezium config to
   > 
   > ```
   > ...
   > "key.converter": "org.apache.kafka.connect.json.MyJsonConverter",
   > "key.converter.schemas.enabled": true,
   > "value.converter": "org.apache.kafka.connect.json.MyJsonConverter",
   > "value.converter.schemas.enabled": true,
   > ```
   > 
   > but seems it still does not work. I still got default value with an 
nullable field. do u know why? what else do I need to do?
   
   Have you been able to make this work ? I just ran into the same issue. I 
upgraded my schema registry version and tried using the configuration in the 
connector, but it still was showing the wrong data. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14533) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance

2023-01-24 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17680403#comment-17680403
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14533:


I'll give it a few more days before re-enabling both parameters, but so far 
I've got a few runs in with only the `false` parameter enabled and this seems 
to have fixed the flakiness.

Can't really envision why the state updater would/could the listOffsets request 
to fail in the way shown above, but it really does appear to be something about 
enabling this that so badly broke the SmokeTestDriverIntegrationTest

Definitely need to look into this before we consider publicly releasing the 
state updater feature cc [~cadonna] [~lbrutschy] [~guozhang] 

> Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
> -
>
> Key: KAFKA-14533
> URL: https://issues.apache.org/jira/browse/KAFKA-14533
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Greg Harris
>Priority: Major
>  Labels: flaky-test
>
> The SmokeTestDriverIntegrationTest appears to be flakey failing in recent 
> runs:
> ```
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1444/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1443/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1441/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1440/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1438/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1434/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
> ```
> The stacktrace appears to be:
> ```
> java.util.concurrent.TimeoutException: shouldWorkWithRebalance(boolean) timed 
> out after 600 seconds
>  at 
> org.junit.jupiter.engine.extension.TimeoutExceptionFactory.create(TimeoutExceptionFactory.java:29)
>  at 
> org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:58)
>  at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
> ...
>  Suppressed: java.lang.InterruptedException: sleep interrupted
>  at java.lang.Thread.sleep(Native Method)
>  at 
> org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.shouldWorkWithRebalance(SmokeTestDriverIntegrationTest.java:151)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
>  at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>  at 
> org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45)
>  ... 134 more
> ```
> The test appears to be timing out waiting for the SmokeTestClient to complete 
> its asynchronous close, and taking significantly longer to do so (600s 
> instead of 60s) than a typical local test execution time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe merged pull request #13151: Fail broker and controller startup on authorizer failure

2023-01-24 Thread via GitHub


cmccabe merged PR #13151:
URL: https://github.com/apache/kafka/pull/13151


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-14650) IQv2 can throw ConcurrentModificationException when accessing Tasks

2023-01-24 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-14650:
--

 Summary: IQv2 can throw ConcurrentModificationException when 
accessing Tasks 
 Key: KAFKA-14650
 URL: https://issues.apache.org/jira/browse/KAFKA-14650
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.4.0
Reporter: A. Sophie Blee-Goldman


>From failure in *[PositionRestartIntegrationTest.verifyStore[cache=false, 
>log=true, supplier=IN_MEMORY_WINDOW, 
>kind=PAPI]|https://ci-builds.apache.org/job/Kafka/job/kafka/job/3.4/63/testReport/junit/org.apache.kafka.streams.integration/PositionRestartIntegrationTest/Build___JDK_11_and_Scala_2_13___verifyStore_cache_false__log_true__supplier_IN_MEMORY_WINDOW__kind_PAPI_/]*
java.util.ConcurrentModificationException
at 
java.base/java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1208)
at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1244)
at java.base/java.util.TreeMap$EntryIterator.next(TreeMap.java:1239)
at java.base/java.util.HashMap.putMapEntries(HashMap.java:508)
at java.base/java.util.HashMap.putAll(HashMap.java:781)
at 
org.apache.kafka.streams.processor.internals.Tasks.allTasksPerId(Tasks.java:361)
at 
org.apache.kafka.streams.processor.internals.TaskManager.allTasks(TaskManager.java:1537)
at 
org.apache.kafka.streams.processor.internals.StreamThread.allTasks(StreamThread.java:1278)
at org.apache.kafka.streams.KafkaStreams.query(KafkaStreams.java:1921)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.iqv2WaitForResult(IntegrationTestUtils.java:168)
at 
org.apache.kafka.streams.integration.PositionRestartIntegrationTest.shouldReachExpectedPosition(PositionRestartIntegrationTest.java:438)
at 
org.apache.kafka.streams.integration.PositionRestartIntegrationTest.verifyStore(PositionRestartIntegrationTest.java:423)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] guozhangwang commented on pull request #13160: chore: fix flaky DefaultStateUpdaterTest

2023-01-24 Thread via GitHub


guozhangwang commented on PR #13160:
URL: https://github.com/apache/kafka/pull/13160#issuecomment-1402827998

   LGTM!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #13160: chore: fix flaky DefaultStateUpdaterTest

2023-01-24 Thread via GitHub


guozhangwang merged PR #13160:
URL: https://github.com/apache/kafka/pull/13160


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14649) Failures instantiating Connect plugins hides other plugins from REST API, or crash worker

2023-01-24 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-14649:

Description: 
Connect plugin path scanning evaluates the version() method of plugins to 
determine which version of a plugin to load, and what version to advertise as 
part of the REST API. This process involves reflectively constructing an 
instance of the class and calling the version method, which can fail in the 
following scenarios:

1. If a plugin throws an exception from a static initialization block
2. If a plugin does not have a default constructor (such as a non-static inner 
class)
3. If a plugin has a default constructor is not public
4. If a plugin throws an exception from the default constructor
5. If a plugin's version method throws an exception

If any of the above is true for any single connector or rest extension on the 
classpath or plugin.path, the plugin path scanning will exit early, and 
potentially hide other unrelated plugins. This is primarily an issue in 
development and test environments, because they are easy-to-make code mistakes 
that would generally not make it to a release. Exceptions from the version 
method, however, can cause the worker to fail to start up as they are uncaught.

It is desirable for the worker to instead log these exceptions and continue. 
This will prevent one mis-implemented plugin from affecting other plugins, 
while still causing integration tests to fail against the plugin itself. We can 
augment logging to make it clear how to correct these failures, where before it 
was rather opaque and difficult to debug.

  was:
Connect plugin path scanning evaluates the version() method of plugins to 
determine which version of a plugin to load, and what version to advertise as 
part of the REST API. This process involves reflectively constructing an 
instance of the class and calling the version method, which can fail in the 
following scenarios:

1. If a plugin throws an exception from a static initialization block
2. If a plugin does not have a default constructor (such as a non-static inner 
class)
3. If a plugin has a default constructor is not public
4. If a plugin throws an exception from the default constructor
5. If a plugin's version method throws an exception

If any of the above is true for any single connector or rest extension on the 
classpath or plugin.path, the plugin path scanning will exit early, and 
potentially hide other unrelated plugins. This is primarily an issue in 
development and test environments, because they are easy-to-make code mistakes 
that would generally not make it to a release. Exceptions from the version 
method, however, can cause the worker to throw

It is desirable for the worker to instead log these exceptions and continue. 
This will prevent one mis-implemented plugin from affecting other plugins, 
while still causing integration tests to fail against the plugin itself. We can 
augment logging to make it clear how to correct these failures, where before it 
was rather opaque and difficult to debug.


> Failures instantiating Connect plugins hides other plugins from REST API, or 
> crash worker
> -
>
> Key: KAFKA-14649
> URL: https://issues.apache.org/jira/browse/KAFKA-14649
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.1.0, 3.0.0, 3.2.0, 3.3.0, 3.4.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
>
> Connect plugin path scanning evaluates the version() method of plugins to 
> determine which version of a plugin to load, and what version to advertise as 
> part of the REST API. This process involves reflectively constructing an 
> instance of the class and calling the version method, which can fail in the 
> following scenarios:
> 1. If a plugin throws an exception from a static initialization block
> 2. If a plugin does not have a default constructor (such as a non-static 
> inner class)
> 3. If a plugin has a default constructor is not public
> 4. If a plugin throws an exception from the default constructor
> 5. If a plugin's version method throws an exception
> If any of the above is true for any single connector or rest extension on the 
> classpath or plugin.path, the plugin path scanning will exit early, and 
> potentially hide other unrelated plugins. This is primarily an issue in 
> development and test environments, because they are easy-to-make code 
> mistakes that would generally not make it to a release. Exceptions from the 
> version method, however, can cause the worker to fail to start up as they are 
> uncaught.
> It is desirable for the worker to instead log these exceptions and continue. 
> This will prevent one mis-implemented plugin from affecting other plugins, 

[jira] [Updated] (KAFKA-14649) Failures instantiating Connect plugins hides other plugins from REST API, or crash worker

2023-01-24 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-14649:

Summary: Failures instantiating Connect plugins hides other plugins from 
REST API, or crash worker  (was: Failures instantiating Connect plugins hides 
other plugins from REST API)

> Failures instantiating Connect plugins hides other plugins from REST API, or 
> crash worker
> -
>
> Key: KAFKA-14649
> URL: https://issues.apache.org/jira/browse/KAFKA-14649
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.1.0, 3.0.0, 3.2.0, 3.3.0, 3.4.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
>
> Connect plugin path scanning evaluates the version() method of plugins to 
> determine which version of a plugin to load, and what version to advertise as 
> part of the REST API. This process involves reflectively constructing an 
> instance of the class and calling the version method, which can fail in the 
> following scenarios:
> 1. If a plugin throws an exception from a static initialization block
> 2. If a plugin does not have a default constructor (such as a non-static 
> inner class)
> 3. If a plugin has a default constructor is not public
> 4. If a plugin throws an exception from the default constructor
> 5. If a plugin's version method throws an exception
> If any of the above is true for any single connector or rest extension on the 
> classpath or plugin.path, the plugin path scanning will exit early, and 
> potentially hide other unrelated plugins. This is primarily an issue in 
> development and test environments, because they are easy-to-make code 
> mistakes that would generally not make it to a release. Exceptions from the 
> version method, however, can cause the worker to throw
> It is desirable for the worker to instead log these exceptions and continue. 
> This will prevent one mis-implemented plugin from affecting other plugins, 
> while still causing integration tests to fail against the plugin itself. We 
> can augment logging to make it clear how to correct these failures, where 
> before it was rather opaque and difficult to debug.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14649) Failures instantiating Connect plugins hides other plugins from REST API

2023-01-24 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-14649:

Description: 
Connect plugin path scanning evaluates the version() method of plugins to 
determine which version of a plugin to load, and what version to advertise as 
part of the REST API. This process involves reflectively constructing an 
instance of the class and calling the version method, which can fail in the 
following scenarios:

1. If a plugin throws an exception from a static initialization block
2. If a plugin does not have a default constructor (such as a non-static inner 
class)
3. If a plugin has a default constructor is not public
4. If a plugin throws an exception from the default constructor
5. If a plugin's version method throws an exception

If any of the above is true for any single connector or rest extension on the 
classpath or plugin.path, the plugin path scanning will exit early, and 
potentially hide other unrelated plugins. This is primarily an issue in 
development and test environments, because they are easy-to-make code mistakes 
that would generally not make it to a release. Exceptions from the version 
method, however, can cause the worker to throw

It is desirable for the worker to instead log these exceptions and continue. 
This will prevent one mis-implemented plugin from affecting other plugins, 
while still causing integration tests to fail against the plugin itself. We can 
augment logging to make it clear how to correct these failures, where before it 
was rather opaque and difficult to debug.

  was:
Connect plugin path scanning evaluates the version() method of plugins to 
determine which version of a plugin to load, and what version to advertise as 
part of the REST API. This process involves reflectively constructing an 
instance of the class and calling the version method, which can fail in the 
following scenarios:

1. If a plugin throws an exception from a static initialization block
2. If a plugin does not have a default constructor (such as a non-static inner 
class)
3. If a plugin has a default constructor is not public
4. If a plugin throws an exception from the default constructor
5. If a plugin's version method throws an exception

If any of the above is true for any single connector or rest extension on the 
classpath or plugin.path, the worker will fail to start up entirely. This is 
primarily an issue in development and test environments, because they are 
easy-to-make code mistakes that would generally not make it to a release.

It is desirable for the worker to instead log these exceptions and continue. 
This will prevent one mis-implemented plugin from affecting other plugins, 
while still causing integration tests to fail against the plugin itself. We can 
augment logging to make it clear how to correct these failures, where before it 
was rather opaque and difficult to debug.


> Failures instantiating Connect plugins hides other plugins from REST API
> 
>
> Key: KAFKA-14649
> URL: https://issues.apache.org/jira/browse/KAFKA-14649
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.1.0, 3.0.0, 3.2.0, 3.3.0, 3.4.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
>
> Connect plugin path scanning evaluates the version() method of plugins to 
> determine which version of a plugin to load, and what version to advertise as 
> part of the REST API. This process involves reflectively constructing an 
> instance of the class and calling the version method, which can fail in the 
> following scenarios:
> 1. If a plugin throws an exception from a static initialization block
> 2. If a plugin does not have a default constructor (such as a non-static 
> inner class)
> 3. If a plugin has a default constructor is not public
> 4. If a plugin throws an exception from the default constructor
> 5. If a plugin's version method throws an exception
> If any of the above is true for any single connector or rest extension on the 
> classpath or plugin.path, the plugin path scanning will exit early, and 
> potentially hide other unrelated plugins. This is primarily an issue in 
> development and test environments, because they are easy-to-make code 
> mistakes that would generally not make it to a release. Exceptions from the 
> version method, however, can cause the worker to throw
> It is desirable for the worker to instead log these exceptions and continue. 
> This will prevent one mis-implemented plugin from affecting other plugins, 
> while still causing integration tests to fail against the plugin itself. We 
> can augment logging to make it clear how to correct these failures, where 
> before it was rather opaque and difficult to debug.



--
This 

[jira] [Updated] (KAFKA-14649) Failures instantiating Connect plugins hides other plugins from REST API

2023-01-24 Thread Greg Harris (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Harris updated KAFKA-14649:

Summary: Failures instantiating Connect plugins hides other plugins from 
REST API  (was: Failures instantiating Connect plugins crash worker on startup)

> Failures instantiating Connect plugins hides other plugins from REST API
> 
>
> Key: KAFKA-14649
> URL: https://issues.apache.org/jira/browse/KAFKA-14649
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.1.0, 3.0.0, 3.2.0, 3.3.0, 3.4.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
>
> Connect plugin path scanning evaluates the version() method of plugins to 
> determine which version of a plugin to load, and what version to advertise as 
> part of the REST API. This process involves reflectively constructing an 
> instance of the class and calling the version method, which can fail in the 
> following scenarios:
> 1. If a plugin throws an exception from a static initialization block
> 2. If a plugin does not have a default constructor (such as a non-static 
> inner class)
> 3. If a plugin has a default constructor is not public
> 4. If a plugin throws an exception from the default constructor
> 5. If a plugin's version method throws an exception
> If any of the above is true for any single connector or rest extension on the 
> classpath or plugin.path, the worker will fail to start up entirely. This is 
> primarily an issue in development and test environments, because they are 
> easy-to-make code mistakes that would generally not make it to a release.
> It is desirable for the worker to instead log these exceptions and continue. 
> This will prevent one mis-implemented plugin from affecting other plugins, 
> while still causing integration tests to fail against the plugin itself. We 
> can augment logging to make it clear how to correct these failures, where 
> before it was rather opaque and difficult to debug.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14649) Failures instantiating Connect plugins crash worker on startup

2023-01-24 Thread Greg Harris (Jira)
Greg Harris created KAFKA-14649:
---

 Summary: Failures instantiating Connect plugins crash worker on 
startup
 Key: KAFKA-14649
 URL: https://issues.apache.org/jira/browse/KAFKA-14649
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 3.3.0, 3.2.0, 3.0.0, 3.1.0, 3.4.0
Reporter: Greg Harris
Assignee: Greg Harris


Connect plugin path scanning evaluates the version() method of plugins to 
determine which version of a plugin to load, and what version to advertise as 
part of the REST API. This process involves reflectively constructing an 
instance of the class and calling the version method, which can fail in the 
following scenarios:

1. If a plugin throws an exception from a static initialization block
2. If a plugin does not have a default constructor (such as a non-static inner 
class)
3. If a plugin has a default constructor is not public
4. If a plugin throws an exception from the default constructor
5. If a plugin's version method throws an exception

If any of the above is true for any single connector or rest extension on the 
classpath or plugin.path, the worker will fail to start up entirely. This is 
primarily an issue in development and test environments, because they are 
easy-to-make code mistakes that would generally not make it to a release.

It is desirable for the worker to instead log these exceptions and continue. 
This will prevent one mis-implemented plugin from affecting other plugins, 
while still causing integration tests to fail against the plugin itself. We can 
augment logging to make it clear how to correct these failures, where before it 
was rather opaque and difficult to debug.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vcrfxia commented on a diff in pull request #13143: KAFKA-14491: [3/N] Add logical key value segments

2023-01-24 Thread via GitHub


vcrfxia commented on code in PR #13143:
URL: https://github.com/apache/kafka/pull/13143#discussion_r1085990051


##
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This "logical segment" is a segment which shares its underlying physical 
store with other
+ * logical segments. Each segment uses a unique, fixed-length key prefix 
derived from the
+ * segment ID when writing to the shared physical store.
+ */
+class LogicalKeyValueSegment implements Comparable, 
Segment {
+private static final Logger log = 
LoggerFactory.getLogger(LogicalKeyValueSegment.class);
+
+public final long id;
+private final String name;
+private final RocksDBStore physicalStore;
+private final PrefixKeyFormatter prefixKeyFormatter;
+
+private volatile boolean open = false;
+final Set> openIterators = 
Collections.synchronizedSet(new HashSet<>());
+
+LogicalKeyValueSegment(final long id,
+   final String name,
+   final RocksDBStore physicalStore) {
+this.id = id;
+this.name = name;
+this.physicalStore = Objects.requireNonNull(physicalStore);
+
+this.prefixKeyFormatter = new 
PrefixKeyFormatter(serializeLongToBytes(id));
+}
+
+void openDB() {
+open = true;
+}
+
+@Override
+public int compareTo(final LogicalKeyValueSegment segment) {
+return Long.compare(id, segment.id);
+}
+
+@Override
+public synchronized void destroy() {
+final Bytes keyPrefix = prefixKeyFormatter.getPrefix();
+
+// this is a prefix deletion, because the deleteRange() implementation
+// calls Bytes.increment() in order to make keyTo inclusive
+physicalStore.deleteRange(keyPrefix, keyPrefix);
+}
+
+@Override
+public synchronized void deleteRange(final Bytes keyFrom, final Bytes 
keyTo) {
+physicalStore.deleteRange(
+prefixKeyFormatter.forPhysicalStore(keyFrom),
+prefixKeyFormatter.forPhysicalStore(keyTo));
+}
+
+@Override
+public synchronized void put(final Bytes key, final byte[] value) {
+physicalStore.put(
+prefixKeyFormatter.forPhysicalStore(key),
+value);
+}
+
+@Override
+public synchronized byte[] putIfAbsent(final Bytes key, final byte[] 
value) {
+return physicalStore.putIfAbsent(
+prefixKeyFormatter.forPhysicalStore(key),
+value);
+}
+
+@Override
+public synchronized void putAll(final List> 
entries) {
+physicalStore.putAll(entries.stream()
+.map(kv -> new KeyValue<>(
+prefixKeyFormatter.forPhysicalStore(kv.key),
+kv.value))
+.collect(Collectors.toList()));
+}
+
+@Override
+public synchronized byte[] delete(final Bytes key) {
+return physicalStore.delete(prefixKeyFormatter.forPhysicalStore(key));
+}
+
+@Override
+public String name() {
+return name;
+}
+
+@Deprecated
+@Override
+public void init(final ProcessorContext context, final StateStore root) {
+throw new UnsupportedOperationException("cannot initialize a logical 
segment");
+}
+
+@Override
+public void flush() {
+throw new UnsupportedOperationException("nothing to flush for logical 
segment");
+}
+
+   

[jira] [Created] (KAFKA-14648) Do not fail clients if bootstrap servers is not immediately resolvable

2023-01-24 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-14648:
---

 Summary: Do not fail clients if bootstrap servers is not 
immediately resolvable
 Key: KAFKA-14648
 URL: https://issues.apache.org/jira/browse/KAFKA-14648
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson


In dynamic environments, such as system tests, there is sometimes a delay 
between when a client is initialized and when the configured bootstrap servers 
become available in DNS. Currently clients will fail immediately if none of the 
bootstrap servers can resolve. It would be more convenient for these 
environments to provide a grace period to give more time for initialization. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] cmccabe commented on a diff in pull request #13140: KAFKA-14644: Process should crash after failure in Raft IO thread

2023-01-24 Thread via GitHub


cmccabe commented on code in PR #13140:
URL: https://github.com/apache/kafka/pull/13140#discussion_r1085968353


##
core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala:
##
@@ -198,7 +200,8 @@ class RaftManagerTest {
   @Test
   def testShutdownIoThread(): Unit = {
 val raftClient = mock(classOf[KafkaRaftClient[String]])
-val ioThread = new RaftIoThread(raftClient, threadNamePrefix = "test-raft")
+val faultHandler = mock(classOf[FaultHandler])

Review Comment:
   use `MockFaultHandler` here
   
   then, lalter in the test, do `maybeRethrowFirstException` to ensure no 
faults happened



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13142: KAFKA-14491: [2/N] Refactor RocksDB store open iterator management

2023-01-24 Thread via GitHub


vcrfxia commented on code in PR #13142:
URL: https://github.com/apache/kafka/pull/13142#discussion_r1085933946


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##
@@ -351,13 +360,23 @@ public  QueryResult query(
 @Override
 public , P> KeyValueIterator 
prefixScan(final P prefix,

 final PS prefixKeySerializer) {
+if (userManagedIterators) {
+throw new IllegalStateException("Must specify openIterators in 
call to prefixScan()");
+}
+return prefixScan(prefix, prefixKeySerializer, openIterators);
+}
+
+, P> KeyValueIterator 
prefixScan(final P prefix,
+ 
final PS prefixKeySerializer,
+ 
final Set> openIterators) {

Review Comment:
   No, that's a good point. I've just added this additional validation, which 
required refactoring these calls so that the two versions of the method do not 
call each other. Instead they each call a third (helper) method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vcrfxia commented on a diff in pull request #13142: KAFKA-14491: [2/N] Refactor RocksDB store open iterator management

2023-01-24 Thread via GitHub


vcrfxia commented on code in PR #13142:
URL: https://github.com/apache/kafka/pull/13142#discussion_r1085935187


##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStore.java:
##
@@ -383,7 +383,9 @@ public KeyValue makeNext() {
 
 @Override
 public synchronized void close() {
-openIterators.remove(this);
+if (closeCallback != null) {

Review Comment:
   Unclear whether we want to require that a closeCallback is always registered 
in general, but it is true that for these two specific classes (RocksDbIterator 
and RocksDBDualCFIterator), we do want to require that a closeCallback is 
always set. I've updated the code to reflect this.



##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##
@@ -351,13 +360,23 @@ public  QueryResult query(
 @Override
 public , P> KeyValueIterator 
prefixScan(final P prefix,

 final PS prefixKeySerializer) {
+if (userManagedIterators) {
+throw new IllegalStateException("Must specify openIterators in 
call to prefixScan()");
+}
+return prefixScan(prefix, prefixKeySerializer, openIterators);
+}
+
+, P> KeyValueIterator 
prefixScan(final P prefix,
+ 
final PS prefixKeySerializer,
+ 
final Set> openIterators) {

Review Comment:
   No. I've just added this additional validation, which required refactoring 
these calls so that the two versions of the method do not call each other. 
Instead they each call a third (helper) method.



##
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java:
##
@@ -114,6 +114,7 @@ public class RocksDBStore implements KeyValueStore, BatchWritingS
 private boolean userSpecifiedStatistics = false;
 
 private final RocksDBMetricsRecorder metricsRecorder;
+private final boolean userManagedIterators;

Review Comment:
   Good point. I think `selfManagedIterators` is also confusing though because 
it's unclear whether "self" means the store itself or the caller themselves.
   
   I've updated this to `autoManagedIterators` which means the opposite of what 
I initially had (for `userManagedIterators`) and added a comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dpcollins-google opened a new pull request, #13162: fix: replace an inefficient loop in kafka internals

2023-01-24 Thread via GitHub


dpcollins-google opened a new pull request, #13162:
URL: https://github.com/apache/kafka/pull/13162

   Instead use Channels.newChannel to write in larger chunks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Cerchie opened a new pull request, #13161: Kafka 14128

2023-01-24 Thread via GitHub


Cerchie opened a new pull request, #13161:
URL: https://github.com/apache/kafka/pull/13161

   In response to [14128](https://issues.apache.org/jira/browse/KAFKA-14128). 
Addresses by moving final catch condition into an else block. 
   
   Testing strategy: I'm attempting a unit test first.  I've cp'ed from the 
test above so I can use the MockTime and InternalTopicManager. Currently, when 
I run the test, it's throwing an error the top line of which is: 
   
   ```
   expected org.apache.kafka.common.errors.TimeoutException to be thrown, but 
nothing was thrown
   ```
   
   This is prior to the issue of figuring out the text of the error, which is 
my next step. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] lucasbru opened a new pull request, #13160: chore: fix flaky DefaultStateUpdaterTest

2023-01-24 Thread via GitHub


lucasbru opened a new pull request, #13160:
URL: https://github.com/apache/kafka/pull/13160

   Mockito should not make named topologies paused by default.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram merged pull request #13119: KAFKA-14623: OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging

2023-01-24 Thread via GitHub


rajinisivaram merged PR #13119:
URL: https://github.com/apache/kafka/pull/13119


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #13119: KAFKA-14623: OAuth's HttpAccessTokenRetriever potentially leaks secrets in logging

2023-01-24 Thread via GitHub


rajinisivaram commented on PR #13119:
URL: https://github.com/apache/kafka/pull/13119#issuecomment-1402448121

   Test failures not related, merging to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov commented on pull request #13157: KAFKA-14599 Enable javadoc for :connect:mirror module

2023-01-24 Thread via GitHub


nizhikov commented on PR #13157:
URL: https://github.com/apache/kafka/pull/13157#issuecomment-1402420416

   I can only imagine users who excluded mirror-client dependency for whatever 
reason :)
   Don't think we should consider such case as a blocker.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13157: KAFKA-14599 Enable javadoc for :connect:mirror module

2023-01-24 Thread via GitHub


mimaison commented on PR #13157:
URL: https://github.com/apache/kafka/pull/13157#issuecomment-1402375492

   The Config classes are not part of the public API. I suggested moving the 
interfaces without thinking too much about it, have you considered if this 
could break anything?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #13136: KAFKA-14582: Move JmxTool to tools

2023-01-24 Thread via GitHub


mimaison commented on code in PR #13136:
URL: https://github.com/apache/kafka/pull/13136#discussion_r1085565198


##
checkstyle/import-control.xml:
##
@@ -347,7 +347,7 @@
 
   
 
-

Review Comment:
   We seem to have the trailing space everywhere else, so maybe keep it here too



##
core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala:
##
@@ -801,21 +801,15 @@ object ConsumerGroupCommand extends Logging {
 partitionsToReset.map { topicPartition =>
   logStartOffsets.get(topicPartition) match {
 case Some(LogOffsetResult.LogOffset(offset)) => (topicPartition, 
new OffsetAndMetadata(offset))
-case _ => {
-  CommandLineUtils.printUsageAndDie(opts.parser, s"Error getting 
starting offset of topic partition: $topicPartition")
-  Exit.exit(1)
-}
+case _ => ToolsUtils.printUsageAndDie(opts.parser, s"Error getting 
starting offset of topic partition: $topicPartition")

Review Comment:
   Can you remove other changes not related to JmxTool? 



##
tools/src/main/java/org/apache/kafka/tools/JmxCommand.java:
##
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.MBeanFeatureInfo;
+import javax.management.MBeanInfo;
+import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+import javax.rmi.ssl.SslRMIClientSocketFactory;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiPredicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A program for reading JMX metrics from a given endpoint.
+ * 
+ * This tool only works reliably if the JmxServer is fully initialized prior 
to invoking the tool.
+ * See KAFKA-4620 for details.
+ */
+public class JmxCommand {
+public static void main(String[] args) {
+Exit.exit(mainNoExit(args));
+}
+
+static int mainNoExit(String... args) {
+try {
+execute(args);
+return 0;
+} catch (TerseException e) {
+System.err.println(e.getMessage());
+return 1;
+} catch (Throwable e) {
+System.err.println(e.getMessage());
+System.err.println(Utils.stackTrace(e));
+return 1;
+}
+}
+
+static void execute(String... args) throws Exception {
+JmxCommandOptions options = new JmxCommandOptions(args);
+CommandLineUtils.printHelpAndExitIfNeeded(options, "Dump JMX values to 
standard output.");
+
+Optional attributesInclude = options.attributesInclude();
+Optional dateFormat = options.dateFormat();
+String reportFormat = options.parseFormat();
+boolean keepGoing = true;
+
+MBeanServerConnection conn = connectToBeanServer(options);
+List queries = options.queries();
+boolean hasPatternQueries = 
queries.stream().filter(Objects::nonNull).anyMatch(ObjectName::isPattern);
+
+Set found = findObjectsIfNoPattern(options, conn, queries, 
hasPatternQueries);
+Map numExpectedAttributes =
+findNumExpectedAttributes(conn, attributesInclude, 
hasPatternQueries, queries, found);
+
+

[GitHub] [kafka] fvaleri commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-24 Thread via GitHub


fvaleri commented on code in PR #13131:
URL: https://github.com/apache/kafka/pull/13131#discussion_r1085675650


##
server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.Exit;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Helper functions for dealing with command line utilities.
+ */
+public class CommandLineUtils {
+/**
+ * Check if there are no options or `--help` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) 
{
+return commandOpts.args.length == 0 || 
commandOpts.options.has(commandOpts.helpOpt);
+}
+
+/**
+ * Check if there is `--version` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+public static boolean isPrintVersionNeeded(CommandDefaultOptions 
commandOpts) {
+return commandOpts.options.has(commandOpts.versionOpt);
+}
+
+/**
+ * Check and print help message if there is no options or `--help` option
+ * from command line, if `--version` is specified on the command line
+ * print version information and exit.
+ * NOTE: The function name is not strictly speaking correct anymore

Review Comment:
   What about maybePrintHelpOrVersion? 
   
   I think the exit part is kind of implicit when you ask for help or version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-24 Thread via GitHub


fvaleri commented on code in PR #13131:
URL: https://github.com/apache/kafka/pull/13131#discussion_r1085675650


##
server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.Exit;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Helper functions for dealing with command line utilities.
+ */
+public class CommandLineUtils {
+/**
+ * Check if there are no options or `--help` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) 
{
+return commandOpts.args.length == 0 || 
commandOpts.options.has(commandOpts.helpOpt);
+}
+
+/**
+ * Check if there is `--version` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+public static boolean isPrintVersionNeeded(CommandDefaultOptions 
commandOpts) {
+return commandOpts.options.has(commandOpts.versionOpt);
+}
+
+/**
+ * Check and print help message if there is no options or `--help` option
+ * from command line, if `--version` is specified on the command line
+ * print version information and exit.
+ * NOTE: The function name is not strictly speaking correct anymore

Review Comment:
   What about maybePrintHelpOrVersion? I think the exit part is kind of 
implicit when you ask for help or version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-24 Thread via GitHub


fvaleri commented on PR #13131:
URL: https://github.com/apache/kafka/pull/13131#issuecomment-1402347644

   > we have pushed Exit.exit(1) down into CommandLineUtils.
   
   @clolov it wasn't pushed down, that's the original logic. The code you are 
referring to in ZkSecurityMigrator.scala:103 is outdated. Now we are using 
ToolsUtils.printUsageAndDie in a few places for the reason expressed in the 
method comment. Please, pull latest changes and let me know.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] fvaleri commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-24 Thread via GitHub


fvaleri commented on code in PR #13131:
URL: https://github.com/apache/kafka/pull/13131#discussion_r1085675650


##
server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.Exit;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Helper functions for dealing with command line utilities.
+ */
+public class CommandLineUtils {
+/**
+ * Check if there are no options or `--help` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) 
{
+return commandOpts.args.length == 0 || 
commandOpts.options.has(commandOpts.helpOpt);
+}
+
+/**
+ * Check if there is `--version` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+public static boolean isPrintVersionNeeded(CommandDefaultOptions 
commandOpts) {
+return commandOpts.options.has(commandOpts.versionOpt);
+}
+
+/**
+ * Check and print help message if there is no options or `--help` option
+ * from command line, if `--version` is specified on the command line
+ * print version information and exit.
+ * NOTE: The function name is not strictly speaking correct anymore

Review Comment:
   What about maybePrintHelpOrVersion?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-13999) Add ProducerCount metrics (KIP-847)

2023-01-24 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya reassigned KAFKA-13999:


Assignee: Anastasia Vela

> Add ProducerCount metrics (KIP-847)
> ---
>
> Key: KAFKA-13999
> URL: https://issues.apache.org/jira/browse/KAFKA-13999
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Artem Livshits
>Assignee: Anastasia Vela
>Priority: Minor
> Fix For: 3.5.0
>
>
> See 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-847%3A+Add+ProducerCount+metrics



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-13999) Add ProducerCount metrics (KIP-847)

2023-01-24 Thread Divij Vaidya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13999?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Divij Vaidya updated KAFKA-13999:
-
Fix Version/s: 3.5.0

> Add ProducerCount metrics (KIP-847)
> ---
>
> Key: KAFKA-13999
> URL: https://issues.apache.org/jira/browse/KAFKA-13999
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Artem Livshits
>Priority: Minor
> Fix For: 3.5.0
>
>
> See 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-847%3A+Add+ProducerCount+metrics



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14639) Kafka CooperativeStickyAssignor revokes/assigns partition in one rebalance cycle

2023-01-24 Thread Bojan Blagojevic (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17680275#comment-17680275
 ] 

Bojan Blagojevic commented on KAFKA-14639:
--

Thank you again for the quick response. I will try to answer to your questions.
h5. Answers:
??1. Can you check and/or provide all the logs from consumer-3 between gen 639 
and gen 640? Is there anything in there about resetting the generation, 
dropping out of the group, resetting the member id, anything at all like that???
I don't see nothing like that happening between gen 639 and gen 640. I attached 
the log excerpt related to few surrounding generations [^consumers-jira.log].

??2. The only other thing off the top of my head to check would be that every 
single consumer was configured with (only) the CooperativeStickyAssignor over 
the full period from gen 639 through the end of gen 640, or at least check the 
group leader (consumer-5 I think?) and consumers 3 & 4.??
The full logs are unfortunately expired but I am pretty sure that all the 
consumers were configured with only `CooperativeStickyAssignor`. They are part 
of Kuberenetes deployment in which all the pods share the configuration. I 
observed correct group leader behaviour when changing ownership of other 
partitions. I followed ownership changes when partition *partition-68* is moved.
It belonged to the consumer partition-2-6b9db8686f-hswvn... in generation 639.
{code:java}
Final assignment of partitions to consumers:
partition-2-6b9db8686f-hswvn-bbcfa7e4-7d5b-4227-ad62-99e8cc6e176f=[partition-20,
 partition-68]
...
Finished assignment for group at generation 639:
partition-2-6b9db8686f-hswvn-bbcfa7e4-7d5b-4227-ad62-99e8cc6e176f=Assignment(partitions=[partition-20,
 partition-68]) {code}
Between generation 639 and generation 640 new pod joins, pod-6b9db8686f-p87m9. 
One of the Kafka consumers that belongs to this pod, 
partition-3-6b9db8686f-p87m9..., in generation 640 gets the partition-68 as 
assigned and it is logged in AbstractStickyAssignor.constrainedAssign.
{code:java}
Final assignment of partitions to consumers:
partition-3-6b9db8686f-p87m9-737d9359-daa5-4d89-9e4c-40a433aa8c6c=[partition-68]{code}
Since this partition is changing ownership, it does not show up in the log of 
ConsumerCoordinator, which is expected.
{code:java}
Finished assignment for group at generation 640:
partition-3-6b9db8686f-p87m9-737d9359-daa5-4d89-9e4c-40a433aa8c6c=Assignment(partitions=[]){code}
And it gets assigned in generation 641:
 
{code:java}
Final assignment of partitions to consumers: 
partition-3-6b9db8686f-psfx4-5dd12c98-e698-44aa-9131-56281e798369=[partition-68]
 
... 
Finished assignment for group at generation 641: 
partition-3-6b9db8686f-psfx4-5dd12c98-e698-44aa-9131-56281e798369=Assignment(partitions=[partition-68]){code}
It gets assigned to a consumer which is different than the one determined in 
generation 640 but this does not break the rebalance barrier.
h5. Additional notes
Not sure if it matters. I saw a several consumers logging:
{code:java}
... org.apache.kafka.clients.Metadata ... Resetting the last seen epoch of 
partition partition-74 to 149 since the associated topicId changed from null to 
nixqUZnpQYWjY0RreaCczA" {code}
 
I think that the HB thread(I am assuming this based on the [code I've 
read|https://github.com/apache/kafka/blob/3.2.1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L1193])
 is requesting join on behalf of consumers. Again, not sure if it matters:
{code:java}
"2022-12-14 11:17:48 1 --- [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
: [Consumer clientId=partition-4-my-client-id-my-group-id-random_hash, 
groupId=my-group-id] (Re-)joining group"
"2022-12-14 11:17:48 1 --- [consumer-2] o.a.k.c.c.internals.ConsumerCoordinator 
: [Consumer clientId=partition-2-my-client-id-my-group-id-random_hash, 
groupId=my-group-id] (Re-)joining group"
"2022-12-14 11:17:48 1 --- [my-group-id] 
o.a.k.c.c.internals.ConsumerCoordinator : [Consumer 
clientId=partition-4-my-client-id-my-group-id-random_hash, groupId=my-group-id] 
Request joining group due to: group is already rebalancing"
"2022-12-14 11:17:48 1 --- [my-group-id] 
o.a.k.c.c.internals.ConsumerCoordinator : [Consumer 
clientId=partition-2-my-client-id-my-group-id-random_hash, groupId=my-group-id] 
Request joining group due to: group is already rebalancing"
"2022-12-14 11:17:47 1 --- [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
: [Consumer clientId=partition-3-my-client-id-my-group-id-random_hash, 
groupId=my-group-id] (Re-)joining group"
"2022-12-14 11:17:47 1 --- [consumer-1] o.a.k.c.c.internals.ConsumerCoordinator 
: [Consumer clientId=partition-1-my-client-id-my-group-id-random_hash, 
groupId=my-group-id] (Re-)joining group"
"2022-12-14 11:17:47 1 --- [consumer-0] o.a.k.c.c.internals.ConsumerCoordinator 
: [Consumer 

[jira] [Updated] (KAFKA-14639) Kafka CooperativeStickyAssignor revokes/assigns partition in one rebalance cycle

2023-01-24 Thread Bojan Blagojevic (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bojan Blagojevic updated KAFKA-14639:
-
Attachment: consumers-jira.log

> Kafka CooperativeStickyAssignor revokes/assigns partition in one rebalance 
> cycle
> 
>
> Key: KAFKA-14639
> URL: https://issues.apache.org/jira/browse/KAFKA-14639
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.2.1
>Reporter: Bojan Blagojevic
>Priority: Major
> Attachments: consumers-jira.log
>
>
> I have an application that runs 6 consumers in parallel. I am getting some 
> unexpected results when I use {{{}CooperativeStickyAssignor{}}}. If I 
> understand the mechanism correctly, if the consumer looses partition in one 
> rebalance cycle, the partition should be assigned in the next rebalance cycle.
> This assumption is based on the 
> [RebalanceProtocol|https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.RebalanceProtocol.html]
>  documentation and few blog posts that describe the protocol, like [this 
> one|https://www.confluent.io/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/]
>  on Confluent blog.
> {quote}The assignor should not reassign any owned partitions immediately, but 
> instead may indicate consumers the need for partition revocation so that the 
> revoked partitions can be reassigned to other consumers in the next rebalance 
> event. This is designed for sticky assignment logic which attempts to 
> minimize partition reassignment with cooperative adjustments.
> {quote}
> {quote}Any member that revoked partitions then rejoins the group, triggering 
> a second rebalance so that its revoked partitions can be assigned. Until 
> then, these partitions are unowned and unassigned.
> {quote}
> These are the logs from the application that uses 
> {{{}protocol='cooperative-sticky'{}}}. In the same rebalance cycle 
> ({{{}generationId=640{}}}) {{partition 74}} moves from {{consumer-3}} to 
> {{{}consumer-4{}}}. I omitted the lines that are logged by the other 4 
> consumers.
> Mind that the log is in reverse(bottom to top)
> {code:java}
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : New 
> partition assignment: partition-59, seek to min common offset: 85120524
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler2 : Partitions 
> [partition-59] assigned successfully
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Partitions 
> assigned: [partition-59]
> 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Adding newly assigned partitions: partition-59
> 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Notifying assignor about the new 
> Assignment(partitions=[partition-59])
> 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Request joining group due to: need to revoke partitions 
> [partition-26, partition-74] as indicated by the current assignment and 
> re-join
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler2 : Partitions 
> [partition-26, partition-74] revoked successfully
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Finished 
> removing partition data
> 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-4-my-client-id-my-group-id, 
> groupId=my-group-id] (Re-)joining group
> 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : New 
> partition assignment: partition-74, seek to min common offset: 107317730
> 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler2 : Partitions 
> [partition-74] assigned successfully
> 2022-12-14 11:18:24 1 — [consumer-4] x.y.z.MyRebalanceHandler1 : Partitions 
> assigned: [partition-74]
> 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-4-my-client-id-my-group-id, 
> groupId=my-group-id] Adding newly assigned partitions: partition-74
> 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-4-my-client-id-my-group-id, 
> groupId=my-group-id] Notifying assignor about the new 
> Assignment(partitions=[partition-74])
> 2022-12-14 11:18:24 1 — [consumer-4] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-4-my-client-id-my-group-id, 
> groupId=my-group-id] Request joining group due to: need to revoke partitions 
> 

[GitHub] [kafka] viktorsomogyi commented on a diff in pull request #12992: KIP-887: Add ConfigProvider to make use of environment variables

2023-01-24 Thread via GitHub


viktorsomogyi commented on code in PR #12992:
URL: https://github.com/apache/kafka/pull/12992#discussion_r1085559673


##
clients/src/main/java/org/apache/kafka/common/config/provider/EnvVarConfigProvider.java:
##
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import org.apache.kafka.common.config.ConfigData;
+import org.apache.kafka.common.config.ConfigException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class EnvVarConfigProvider implements ConfigProvider {
+private final Map envVarMap;
+
+public EnvVarConfigProvider() {
+envVarMap = getEnvVars();
+}
+
+public EnvVarConfigProvider(Map envVarsAsArgument) {
+envVarMap = envVarsAsArgument;
+}
+
+private static final Logger log = 
LoggerFactory.getLogger(EnvVarConfigProvider.class);
+
+@Override
+public void configure(Map configs) {
+}
+
+@Override
+public void close() throws IOException {
+}
+
+/**
+ * @param s unused
+ * @return returns environment variables as configuration
+ */
+@Override
+public ConfigData get(String s) {
+return get(s, null);
+}
+
+/**
+ * @param sunused
+ * @param keys the keys whose values will be retrieved.
+ * @return the configuration data.
+ */
+@Override
+public ConfigData get(String s, Set keys) {

Review Comment:
   It may be a bit more defensive if we validate that the path is given is 
either empty or null or else we don't support it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] nizhikov commented on pull request #13157: KAFKA-14599 Enable javadoc for :connect:mirror module

2023-01-24 Thread via GitHub


nizhikov commented on PR #13157:
URL: https://github.com/apache/kafka/pull/13157#issuecomment-1402195231

   @mimaison Thanks for the review.
   
   What about *Config classes? like `MirrorCheckpointConfig` or 
`MirrorConnectorConfig`.
   Are the part of public API?
   Should I move them to mirror-client?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on pull request #13157: KAFKA-14599 Enable javadoc for :connect:mirror module

2023-01-24 Thread via GitHub


mimaison commented on PR #13157:
URL: https://github.com/apache/kafka/pull/13157#issuecomment-1402188145

   @nizhikov Thanks for looking at this issue. I don't think we can simply 
enable the javadoc on the `connect:mirror` project as it will also include all 
public classes from that module that are not part of the public API. We only 
want to include the interfaces users can implement. 
   
   We should be able to use a filter to select the classes to include. 
Otherwise I wonder if we could move the classes we want to the mirror-client 
module. The package should be the same, and as mirror depends on it, it should 
hopefully not break anything.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mimaison commented on a diff in pull request #13131: KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes

2023-01-24 Thread via GitHub


mimaison commented on code in PR #13131:
URL: https://github.com/apache/kafka/pull/13131#discussion_r1085116578


##
server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.Exit;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Helper functions for dealing with command line utilities.
+ */
+public class CommandLineUtils {
+/**
+ * Check if there are no options or `--help` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) 
{
+return commandOpts.args.length == 0 || 
commandOpts.options.has(commandOpts.helpOpt);
+}
+
+/**
+ * Check if there is `--version` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+public static boolean isPrintVersionNeeded(CommandDefaultOptions 
commandOpts) {
+return commandOpts.options.has(commandOpts.versionOpt);
+}
+
+/**
+ * Check and print help message if there is no options or `--help` option
+ * from command line, if `--version` is specified on the command line
+ * print version information and exit.
+ * NOTE: The function name is not strictly speaking correct anymore

Review Comment:
   Should we take this opportunity to update this name?



##
server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java:
##
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+import joptsimple.OptionSpec;
+import org.apache.kafka.common.utils.AppInfoParser;
+import org.apache.kafka.common.utils.Exit;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Helper functions for dealing with command line utilities.
+ */
+public class CommandLineUtils {
+/**
+ * Check if there are no options or `--help` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+public static boolean isPrintHelpNeeded(CommandDefaultOptions commandOpts) 
{
+return commandOpts.args.length == 0 || 
commandOpts.options.has(commandOpts.helpOpt);
+}
+
+/**
+ * Check if there is `--version` option from command line.
+ *
+ * @param commandOpts Acceptable options for a command
+ * @return true on matching the help check condition
+ */
+public static boolean isPrintVersionNeeded(CommandDefaultOptions 
commandOpts) {
+return commandOpts.options.has(commandOpts.versionOpt);
+}
+
+/**
+ * Check and print help message if there is no options or `--help` option
+ * 

[GitHub] [kafka] MPeli commented on pull request #6329: KAFKA-1194: Fix renaming open files on Windows

2023-01-24 Thread via GitHub


MPeli commented on PR #6329:
URL: https://github.com/apache/kafka/pull/6329#issuecomment-1402091415

   Hi, I have created a new pull request. See #12331 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-14565) Fix Interceptor Resource Leakage

2023-01-24 Thread Terry Beard (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14565?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Terry Beard updated KAFKA-14565:

Summary: Fix Interceptor Resource Leakage  (was: Improve Interceptor 
Resource Leakage Prevention)

> Fix Interceptor Resource Leakage
> 
>
> Key: KAFKA-14565
> URL: https://issues.apache.org/jira/browse/KAFKA-14565
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Terry Beard
>Assignee: Terry Beard
>Priority: Major
> Fix For: 3.5.0
>
>
> The Consumer and Producer interceptor interfaces and their corresponding 
> Kafka Consumer and Producer constructors do not adequately support cleanup of 
> underlying interceptor resources. 
> Currently within the Kafka Consumer and Kafka Producer constructors,  the 
> *AbstractConfig.getConfiguredInstances()*  is delegated responsibility for 
> both creating and configuring each interceptor listed in the 
> interceptor.classes property and returns a configured  
> *List>* interceptors.
> This dual responsibility for both creation and configuration is problematic 
> when it involves multiple interceptors where at least one interceptor's 
> configure method implementation creates and/or depends on objects which 
> creates threads, connections or other resources which requires clean up and 
> the subsequent interceptor's configure method raises a runtime exception.  
> This raising of the runtime exception produces a resource leakage in the 
> first interceptor as the interceptor container i.e. 
> ConsumerInterceptors/ProducerInterceptors is never created and therefore the 
> first interceptor's and really any interceptor's close method are never 
> called.  
> To help ensure the respective container interceptors are able to invoke their 
> respective interceptor close methods for proper resource clean up, I propose 
> two approaches:
> +*PROPOSAL 1*+
> Define a default *open* or *configureWithResources()* or *acquireResources()* 
>  method with no implementation and check exception on the respective 
> Consumer/Producer interceptor interfaces.  This method as a part the 
> interceptor life cycle management will be responsible for creating threads 
> and/or objects which utilizes threads, connections or other resource which 
> requires clean up.  Additionally, this default method enables implementation 
> optionality as it's empty default behavior means it will do nothing when 
> unimplemented mitigating backwards compatibility impact to exiting 
> interceptors.  Finally, the Kafka Consumer/Producer Interceptor containers 
> will implement a corresponding *maybeOpen* or *maybeConfigureWithResources* 
> or *maybeAcquireResources* method which also throws a checked exception. 
> See below code excerpt for the Consumer/Producer constructor:
> {code:java}
> List> interceptorList = (List) 
> config.getConfiguredInstances(
> ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
> ConsumerInterceptor.class,
> Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
> this.interceptors = new ConsumerInterceptors<>(interceptorList);
> this.interceptors.maybeConfigureWithResources();
>  {code}
> +*PROPOSAL 2*+
> To avoid changing any public interfaces and the subsequent KIP process, we can
>  * Create a class which inherits or wraps AbstractConfig that contains a new 
> method which will return a ConfiguredInstanceResult class.  This 
> ConfiguredInstanceResult  class will contain an optional list of successfully 
> created interceptors and/or exception which occurred while calling each 
> Interceptor::configure.  Additionally, it will contain a helper method to 
> rethrow an exception as well as a method which returns the underlying 
> exception.  The caller is expected to handle the exception and perform clean 
> up e.g. call  Interceptor::close  on each interceptor in the list provided by 
> the ConfiguredInstanceResult class.
>  * Automatically invoke {{close}} on any {{Closeable}} or {{AutoCloseable}} 
> instances if/when a failure occurs
>  * Add a new overloaded {{getConfiguredInstance}} / 
> {{getConfiguredInstances}} variant that allows users to specify whether 
> already-instantiated classes should be closed or not when a failure occurs
>  * Add a new exception type to the public API that includes a list of all of 
> the successfully-instantiated (and/or successfully-configured) instances 
> before the error was encountered so that callers can choose how to handle the 
> failure however they want (and possibly so that instantiation/configuration 
> can be attempted on every class before throwing the exception)
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14487) Move LogManager to storage module

2023-01-24 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14487?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17680230#comment-17680230
 ] 

Sagar Rao commented on KAFKA-14487:
---

[~ijuma] , is it ok if I can pick this one up? Seems like it's not blocked by 
any other activities that you all are working on? Plz let me know. Thanks

> Move LogManager to storage module
> -
>
> Key: KAFKA-14487
> URL: https://issues.apache.org/jira/browse/KAFKA-14487
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] vamossagar12 commented on pull request #12561: KAFKA-12495: Exponential backoff retry to prevent rebalance storms when worker joins after revoking rebalance

2023-01-24 Thread via GitHub


vamossagar12 commented on PR #12561:
URL: https://github.com/apache/kafka/pull/12561#issuecomment-1402008138

   @C0urante , thanks for your response. Makes sense.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ruslankrivoshein commented on pull request #13074: MINOR: upgrade.from is revealed for Upgrade doc

2023-01-24 Thread via GitHub


ruslankrivoshein commented on PR #13074:
URL: https://github.com/apache/kafka/pull/13074#issuecomment-1401965227

   @hachikuji, what do you think about this tiny edition?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #13111: KAFKA-14190: Update Zk TopicId from locally stored cache in controller

2023-01-24 Thread via GitHub


divijvaidya commented on PR #13111:
URL: https://github.com/apache/kafka/pull/13111#issuecomment-1401824565

   @dajac 
   > Will this code still be around by the time tiered storage is completed?
   I don't know but my point is that this code change is simple and safe enough 
to add it to the current code as of today.
   
   @jolshan 
   > My other concern here is that even though this fixes the issue in the case 
where the controller stays the same, it doesn't cover controller re-election. 
This means we would still have to share and support the recovery methods. If 
this is a big issue for tiered storage, then we could still be in trouble.
   
   To be very precise here, this fix won't work, if the controller context does 
not have the old topic Id. It will only happen when controller failover took 
place exactly between the duration when admin overwrote Zk and controller. Note 
that controller failover during all other time will work fine (since controller 
will recreate controller context from Zk which would have been updated with 
oldTopicId earlier). 
   
   And yes, I agree this is not a 100% fix but it's a start. Since, it's a safe 
fix and doesn't have side effects, we should push it out.
   
   > Also curious if we can upload a segment with the wrong ID if the leader 
and ISR request is blocked (and thus can't become a leader or follower)
   
   Great question! The topic Id mismatch check [during handling of LISR 
request](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L1495)
 is based on matching the local topic Id in the broker with the one that is 
sent with LISR. However, it's very much possible to not have any topicId 
locally. As an example, let's say the partition reassignment leads to partition 
placement on a broker where log hasn't been created so far. In such cases, LISR 
won't throw a topic mismatch error and it won't be blocked. Instead it will 
start operating with new topic Id. Now, we will have some followers working 
with old topic Id (where LISR was blocked) and some with new topic Id. If a 
failover happens to the one with new topic Id, it will start uploading segments 
to tiered storage with new topic Id and thus, for the same topic partition, we 
will have segments with old topic Id as well as new topic Id.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on pull request #13158: KAFKA-14647: Moving TopicFilter to server-common/utils

2023-01-24 Thread via GitHub


vamossagar12 commented on PR #13158:
URL: https://github.com/apache/kafka/pull/13158#issuecomment-1401815286

   @fvaleri , plz review this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 opened a new pull request, #13158: KAFKA-14647: Moving TopicFilter to server-common/utils

2023-01-24 Thread via GitHub


vamossagar12 opened a new pull request, #13158:
URL: https://github.com/apache/kafka/pull/13158

   Moving TopicFilter to server-common/utils


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] divijvaidya commented on pull request #12331: KAFKA-1194: changes needed to run on Windows

2023-01-24 Thread via GitHub


divijvaidya commented on PR #12331:
URL: https://github.com/apache/kafka/pull/12331#issuecomment-1401813587

   @sjetha you should probably send an email to the 
[d...@kafka.apache.org](mailto:d...@kafka.apache.org) mailing list, explaining 
the urgency and asking for someone to take a look at this. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14646) SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 3.3.2)

2023-01-24 Thread Jochen Schalanda (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17680181#comment-17680181
 ] 

Jochen Schalanda commented on KAFKA-14646:
--

Could it still be that the check in 
[https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java#L94-L99]
 is too restrictive and should read *{{record.value().getVersion() > 
SubscriptionWrapper.CURRENT_VERSION}}* instead?

> SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 
> 3.3.2)
> 
>
> Key: KAFKA-14646
> URL: https://issues.apache.org/jira/browse/KAFKA-14646
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
> Environment: Kafka Streams 3.2.3 (before update)
> Kafka Streams 3.3.2 (after update)
> Java 17 (Eclipse Temurin 17.0.5), Linux x86_64
>Reporter: Jochen Schalanda
>Priority: Major
>
> Hey folks,
>  
> we've just updated an application from *_Kafka Streams 3.2.3 to 3.3.2_* and 
> started getting the following exceptions:
> {code:java}
> org.apache.kafka.common.errors.UnsupportedVersionException: 
> SubscriptionWrapper is of an incompatible version. {code}
> After swiftly looking through the code, this exception is potentially thrown 
> in two places:
>  * 
> [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java#L73-L78]
>  ** Here the check was changed in Kafka 3.3.x: 
> [https://github.com/apache/kafka/commit/9dd25ecd9ce17e608c6aba98e0422b26ed133c12]
>  * 
> [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java#L94-L99]
>  ** Here the check wasn't changed.
>  
> Is it possible that the second check in 
> {{SubscriptionStoreReceiveProcessorSupplier}} was forgotten?
>  
> Any hints how to resolve this issue without a downgrade?
> Since this only affects 2 of 15 topologies in the application, I'm hesitant 
> to just downgrade to Kafka 3.2.3 again since the internal topics might 
> already have been updated to use the "new" version of 
> {{{}SubscriptionWrapper{}}}.
>  
> Related discussion in the Confluent Community Slack: 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1674497054507119]
> h2. Stack trace
> {code:java}
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_8, 
> processor=XXX-joined-changed-fk-subscription-registration-source, 
> topic=topic.rev7-XXX-joined-changed-fk-subscription-registration-topic, 
> partition=8, offset=12297976, 
> stacktrace=org.apache.kafka.common.errors.UnsupportedVersionException: 
> SubscriptionWrapper is of an incompatible version.
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:750)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:770)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] nizhikov opened a new pull request, #13157: KAFKA-14599 Enable javadoc for :connect:mirror module

2023-01-24 Thread via GitHub


nizhikov opened a new pull request, #13157:
URL: https://github.com/apache/kafka/pull/13157

   Currently, javadoc task disabled for `:connect:mirror` module.
   This PR enables it.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] yashmayya commented on a diff in pull request #12984: KAFKA-14455: Kafka Connect create and update REST APIs should surface failures while writing to the config topic

2023-01-24 Thread via GitHub


yashmayya commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1084960389


##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -712,8 +733,16 @@ KafkaBasedLog 
setupAndCreateKafkaBasedLog(String topic, final Wo
 }
 
 private void sendPrivileged(String key, byte[] value) {
+sendPrivileged(key, value, null);
+}
+
+private void sendPrivileged(String key, byte[] value, Callback 
callback) {
 if (!usesFencableWriter) {
-configLog.send(key, value);

Review Comment:
   Yeah, that's right, the API response will only include the generic top level 
message. 
   
   > I think it'd be nice to include more detail on the cause of the failure
   
   I strongly agree, and this was discussed in some more detail on the [other 
thread](https://github.com/apache/kafka/pull/12984#discussion_r1064077119).
   
   > We wouldn't be making it more vague. The message would state that the 
write to the config topic failed which is the cause for failure. Since the 
exception mapper used by Connect's REST server only writes the [top level 
exception's 
message](https://github.com/apache/kafka/blob/d798ec779c25dba31fa5ee9384d159ed54c6e07b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java#L72)
 to the response (i.e. nested exceptions aren't surfaced via the REST API 
response), I think it makes sense to keep the top level exception's message 
generic and allow users to debug further via the worker logs (where the entire 
exception chain's stack trace will be visible).
   ...
   The reasoning here is that since a Connect user may not even know that 
Connect uses a producer under the hood to write certain requests to the config 
topic for asynchronous processing, it would make more sense to have an 
informative Connect specific exception message rather than directly throwing 
the producer exception which may or may not contain enough details to be 
relevant to a Connect user.
   
   > Another option for the above issue could be changing the exception mapper 
to concatenate all the exception messages from the exception chain.
   
   > Yet another option for this could be to simply append a "Check the worker 
logs for more details on the error" to the top level exception's message in the 
REST API response (the worker logs will have the entire exception chain). 
Thoughts?
   
   What do you think about modifying the exception mapper to be more 
informative (either in this PR or a separate one)?
   



##
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##
@@ -711,9 +742,9 @@ KafkaBasedLog 
setupAndCreateKafkaBasedLog(String topic, final Wo
 return createKafkaBasedLog(topic, producerProps, consumerProps, new 
ConsumeCallback(), topicDescription, adminSupplier);
 }
 
-private void sendPrivileged(String key, byte[] value) {
+private void sendPrivileged(String key, byte[] value) throws 
ExecutionException, InterruptedException {
 if (!usesFencableWriter) {
-configLog.send(key, value);
+configLog.send(key, value).get();

Review Comment:
   Thanks Chris, both great points. The `get` without timeout here was 
definitely a miss on my part. I've addressed both of your raised concerns in 
the latest patch (including batching multiple sends in a single transaction for 
the EOS enabled case).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Comment Edited] (KAFKA-14646) SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 3.3.2)

2023-01-24 Thread Jochen Schalanda (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17680143#comment-17680143
 ] 

Jochen Schalanda edited comment on KAFKA-14646 at 1/24/23 8:42 AM:
---

Unfortunately the two rolling updates (with {{upgrade.from="3.2"}} and then 
removing the setting again) didn't help.

We still see the same exception:
{code:java}
org.apache.kafka.common.errors.UnsupportedVersionException: SubscriptionWrapper 
is of an incompatible version. {code}
 

[~mjsax] Do you have any hints how to resolve this issue? We see it in only 2 
topologies out of 15 and I'm afraid that downgrading to Kafka Streams 3.2.3 
will break something else now.

 


was (Author: joschi):
Unfortunately the two rolling updates (with {{upgrade.from="3.2"}} and then 
removing the setting again) didn't help.

We still see the same exception:

 
{code:java}
org.apache.kafka.common.errors.UnsupportedVersionException: SubscriptionWrapper 
is of an incompatible version. {code}
 

[~mjsax] Do you have any hints how to resolve this issue? We see it in only 2 
topologies out of 15 and I'm afraid that downgrading to Kafka Streams 3.2.3 
will break something else now.

 

> SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 
> 3.3.2)
> 
>
> Key: KAFKA-14646
> URL: https://issues.apache.org/jira/browse/KAFKA-14646
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
> Environment: Kafka Streams 3.2.3 (before update)
> Kafka Streams 3.3.2 (after update)
> Java 17 (Eclipse Temurin 17.0.5), Linux x86_64
>Reporter: Jochen Schalanda
>Priority: Major
>
> Hey folks,
>  
> we've just updated an application from *_Kafka Streams 3.2.3 to 3.3.2_* and 
> started getting the following exceptions:
> {code:java}
> org.apache.kafka.common.errors.UnsupportedVersionException: 
> SubscriptionWrapper is of an incompatible version. {code}
> After swiftly looking through the code, this exception is potentially thrown 
> in two places:
>  * 
> [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java#L73-L78]
>  ** Here the check was changed in Kafka 3.3.x: 
> [https://github.com/apache/kafka/commit/9dd25ecd9ce17e608c6aba98e0422b26ed133c12]
>  * 
> [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java#L94-L99]
>  ** Here the check wasn't changed.
>  
> Is it possible that the second check in 
> {{SubscriptionStoreReceiveProcessorSupplier}} was forgotten?
>  
> Any hints how to resolve this issue without a downgrade?
> Since this only affects 2 of 15 topologies in the application, I'm hesitant 
> to just downgrade to Kafka 3.2.3 again since the internal topics might 
> already have been updated to use the "new" version of 
> {{{}SubscriptionWrapper{}}}.
>  
> Related discussion in the Confluent Community Slack: 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1674497054507119]
> h2. Stack trace
> {code:java}
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_8, 
> processor=XXX-joined-changed-fk-subscription-registration-source, 
> topic=topic.rev7-XXX-joined-changed-fk-subscription-registration-topic, 
> partition=8, offset=12297976, 
> stacktrace=org.apache.kafka.common.errors.UnsupportedVersionException: 
> SubscriptionWrapper is of an incompatible version.
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:750)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:770)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14646) SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 3.3.2)

2023-01-24 Thread Jochen Schalanda (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17680143#comment-17680143
 ] 

Jochen Schalanda commented on KAFKA-14646:
--

Unfortunately the two rolling updates (with {{upgrade.from="3.2"}} and then 
removing the setting again) didn't help.

We still see the same exception:

 
{code:java}
org.apache.kafka.common.errors.UnsupportedVersionException: SubscriptionWrapper 
is of an incompatible version. {code}
 

[~mjsax] Do you have any hints how to resolve this issue? We see it in only 2 
topologies out of 15 and I'm afraid that downgrading to Kafka Streams 3.2.3 
will break something else now.

 

> SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 
> 3.3.2)
> 
>
> Key: KAFKA-14646
> URL: https://issues.apache.org/jira/browse/KAFKA-14646
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
> Environment: Kafka Streams 3.2.3 (before update)
> Kafka Streams 3.3.2 (after update)
> Java 17 (Eclipse Temurin 17.0.5), Linux x86_64
>Reporter: Jochen Schalanda
>Priority: Major
>
> Hey folks,
>  
> we've just updated an application from *_Kafka Streams 3.2.3 to 3.3.2_* and 
> started getting the following exceptions:
> {code:java}
> org.apache.kafka.common.errors.UnsupportedVersionException: 
> SubscriptionWrapper is of an incompatible version. {code}
> After swiftly looking through the code, this exception is potentially thrown 
> in two places:
>  * 
> [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java#L73-L78]
>  ** Here the check was changed in Kafka 3.3.x: 
> [https://github.com/apache/kafka/commit/9dd25ecd9ce17e608c6aba98e0422b26ed133c12]
>  * 
> [https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java#L94-L99]
>  ** Here the check wasn't changed.
>  
> Is it possible that the second check in 
> {{SubscriptionStoreReceiveProcessorSupplier}} was forgotten?
>  
> Any hints how to resolve this issue without a downgrade?
> Since this only affects 2 of 15 topologies in the application, I'm hesitant 
> to just downgrade to Kafka 3.2.3 again since the internal topics might 
> already have been updated to use the "new" version of 
> {{{}SubscriptionWrapper{}}}.
>  
> Related discussion in the Confluent Community Slack: 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1674497054507119]
> h2. Stack trace
> {code:java}
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_8, 
> processor=XXX-joined-changed-fk-subscription-registration-source, 
> topic=topic.rev7-XXX-joined-changed-fk-subscription-registration-topic, 
> partition=8, offset=12297976, 
> stacktrace=org.apache.kafka.common.errors.UnsupportedVersionException: 
> SubscriptionWrapper is of an incompatible version.
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:750)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
> at 
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
> at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:770)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14646) SubscriptionWrapper is of an incompatible version (Kafka Streams 3.2.3 -> 3.3.2)

2023-01-24 Thread Jochen Schalanda (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jochen Schalanda updated KAFKA-14646:
-
Description: 
Hey folks,
 
we've just updated an application from *_Kafka Streams 3.2.3 to 3.3.2_* and 
started getting the following exceptions:
{code:java}
org.apache.kafka.common.errors.UnsupportedVersionException: SubscriptionWrapper 
is of an incompatible version. {code}
After swiftly looking through the code, this exception is potentially thrown in 
two places:
 * 
[https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java#L73-L78]
 ** Here the check was changed in Kafka 3.3.x: 
[https://github.com/apache/kafka/commit/9dd25ecd9ce17e608c6aba98e0422b26ed133c12]

 * 
[https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java#L94-L99]
 ** Here the check wasn't changed.

 
Is it possible that the second check in 
{{SubscriptionStoreReceiveProcessorSupplier}} was forgotten?
 
Any hints how to resolve this issue without a downgrade?
Since this only affects 2 of 15 topologies in the application, I'm hesitant to 
just downgrade to Kafka 3.2.3 again since the internal topics might already 
have been updated to use the "new" version of {{{}SubscriptionWrapper{}}}.
 
Related discussion in the Confluent Community Slack: 
[https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1674497054507119]
h2. Stack trace
{code:java}
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. 
taskId=1_8, processor=XXX-joined-changed-fk-subscription-registration-source, 
topic=topic.rev7-XXX-joined-changed-fk-subscription-registration-topic, 
partition=8, offset=12297976, 
stacktrace=org.apache.kafka.common.errors.UnsupportedVersionException: 
SubscriptionWrapper is of an incompatible version.

at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:750)
at 
org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
at 
org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:770)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)
 {code}

  was:
Hey folks,
 
we've just updated an application from *_Kafka Streams 3.2.3 to 3.3.2_* and 
started getting the following exceptions:
{code:java}
org.apache.kafka.common.errors.UnsupportedVersionException: SubscriptionWrapper 
is of an incompatible version. {code}
After swiftly looking through the code, this exception is potentially thrown in 
two places:
 * 
[https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinForeignProcessorSupplier.java#L73-L78]
 ** Here the check was changed in Kafka 3.3.x: 
[https://github.com/apache/kafka/commit/9dd25ecd9ce17e608c6aba98e0422b26ed133c12]

 * 
[https://github.com/apache/kafka/blob/3.3.2/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionStoreReceiveProcessorSupplier.java#L94-L99]
 ** Here the check wasn't changed.

 
Is it possible that the second check in 
{{SubscriptionStoreReceiveProcessorSupplier}} was forgotten?
 
Any hints how to resolve this issue without a downgrade?
Since this only affects 2 of 20+ topologies in the application, I'm hesitant to 
just downgrade to Kafka 3.2.3 again since the internal topics might already 
have been updated to use the "new" version of {{{}SubscriptionWrapper{}}}.
 
Related discussion in the Confluent Community Slack: 
[https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1674497054507119]
h2. Stack trace
{code:java}
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. 
taskId=1_8, processor=XXX-joined-changed-fk-subscription-registration-source, 
topic=topic.rev7-XXX-joined-changed-fk-subscription-registration-topic, 
partition=8, offset=12297976, 
stacktrace=org.apache.kafka.common.errors.UnsupportedVersionException: 
SubscriptionWrapper is of an incompatible version.

at 
org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:750)
at 
org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
at 
org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
at 
org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182)
at 

[jira] [Commented] (KAFKA-14639) Kafka CooperativeStickyAssignor revokes/assigns partition in one rebalance cycle

2023-01-24 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14639?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17680141#comment-17680141
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14639:


Thanks for the additional logs, that does indeed verify that both consumers 
participated in the same rebalance. My next guess would be that for some 
reason, consumer-3 had "lost" its partitions prior to joining the group in gen 
640, in which case they would be allowed to be freely given away to another 
consumer in that same generation. Can you check and/or provide all the logs 
from consumer-3 between gen 639 and gen 640? Is there anything in there about 
resetting the generation, dropping out of the group, resetting the member id, 
anything at all like that? 

I also notice that the assignment changes drastically between gen 639 and 640, 
it's not "sticky" at all which should have been easy for the assignor to do if 
the previous assignment was something relatively simple like each consumer 
claiming exactly one or two partitions (only) and all from the same topic. 
Something fishy is definitely going on.

The only other thing off the top of my head to check would be that every single 
consumer was configured with (only) the CooperativeStickyAssignor over the full 
period from gen 639 through the end of gen 640, or at least check the group 
leader (consumer-5 I think?) and consumers 3 & 4.

I'll take a look at the assignor logic and see if anything jumps out at me on 
my end, but I have to say the complete lack of stickiness in the assignment 
from 639 to 640 is fairly perplexing and something I have never seen before 
with the CooperativeStickyAssignor. There have been some recents bugs related 
to rebalancing edge cases that have been fixed over the past few versions, so I 
may go back over those and see if anything was messed up by them. I did in fact 
discover one bug affecting rebalancing/assignment in the past few months which 
had been introduced by that series of fixes, so perhaps there is another.

> Kafka CooperativeStickyAssignor revokes/assigns partition in one rebalance 
> cycle
> 
>
> Key: KAFKA-14639
> URL: https://issues.apache.org/jira/browse/KAFKA-14639
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 3.2.1
>Reporter: Bojan Blagojevic
>Priority: Major
>
> I have an application that runs 6 consumers in parallel. I am getting some 
> unexpected results when I use {{{}CooperativeStickyAssignor{}}}. If I 
> understand the mechanism correctly, if the consumer looses partition in one 
> rebalance cycle, the partition should be assigned in the next rebalance cycle.
> This assumption is based on the 
> [RebalanceProtocol|https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.RebalanceProtocol.html]
>  documentation and few blog posts that describe the protocol, like [this 
> one|https://www.confluent.io/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/]
>  on Confluent blog.
> {quote}The assignor should not reassign any owned partitions immediately, but 
> instead may indicate consumers the need for partition revocation so that the 
> revoked partitions can be reassigned to other consumers in the next rebalance 
> event. This is designed for sticky assignment logic which attempts to 
> minimize partition reassignment with cooperative adjustments.
> {quote}
> {quote}Any member that revoked partitions then rejoins the group, triggering 
> a second rebalance so that its revoked partitions can be assigned. Until 
> then, these partitions are unowned and unassigned.
> {quote}
> These are the logs from the application that uses 
> {{{}protocol='cooperative-sticky'{}}}. In the same rebalance cycle 
> ({{{}generationId=640{}}}) {{partition 74}} moves from {{consumer-3}} to 
> {{{}consumer-4{}}}. I omitted the lines that are logged by the other 4 
> consumers.
> Mind that the log is in reverse(bottom to top)
> {code:java}
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : New 
> partition assignment: partition-59, seek to min common offset: 85120524
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler2 : Partitions 
> [partition-59] assigned successfully
> 2022-12-14 11:18:24 1 — [consumer-3] x.y.z.MyRebalanceHandler1 : Partitions 
> assigned: [partition-59]
> 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Adding newly assigned partitions: partition-59
> 2022-12-14 11:18:24 1 — [consumer-3] o.a.k.c.c.internals.ConsumerCoordinator 
> : [Consumer clientId=partition-3-my-client-id-my-group-id, 
> groupId=my-group-id] Notifying assignor about the new 
> 

[GitHub] [kafka] fvaleri commented on pull request #13136: KAFKA-14582: Move JmxTool to tools

2023-01-24 Thread via GitHub


fvaleri commented on PR #13136:
URL: https://github.com/apache/kafka/pull/13136#issuecomment-1401539053

   @mimaison @clolov @vamossagar12 this is ready for review if you have some 
time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #13156: KAFKA-14533: re-enable 'false' and disable the 'true' parameter of SmokeTestDriverIntegrationTest

2023-01-24 Thread via GitHub


ableegoldman commented on PR #13156:
URL: https://github.com/apache/kafka/pull/13156#issuecomment-1401535688

   Merged to trunk and cherrypicked back to 3.4 since this is trivial and 
rather time-sensitive -- hopefully we can finally narrow down the culprit for 
good and have a better shot at getting a full clean build of 3.4  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-14533) Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance

2023-01-24 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17680134#comment-17680134
 ] 

A. Sophie Blee-Goldman commented on KAFKA-14533:


FYI after disabling the `false` parameter I immediately saw another failure, 
which points to the state updater as the culprit after all.

 

I did one final PR to verify this by disabling the `true` build and enabling 
the `false` build again – https://github.com/apache/kafka/pull/13156

> Flaky Test SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
> -
>
> Key: KAFKA-14533
> URL: https://issues.apache.org/jira/browse/KAFKA-14533
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Greg Harris
>Priority: Major
>  Labels: flaky-test
>
> The SmokeTestDriverIntegrationTest appears to be flakey failing in recent 
> runs:
> ```
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1444/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1443/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1441/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1440/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1438/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
>     
> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/1434/tests/
>         java.util.concurrent.TimeoutException: 
> shouldWorkWithRebalance(boolean) timed out after 600 seconds
> ```
> The stacktrace appears to be:
> ```
> java.util.concurrent.TimeoutException: shouldWorkWithRebalance(boolean) timed 
> out after 600 seconds
>  at 
> org.junit.jupiter.engine.extension.TimeoutExceptionFactory.create(TimeoutExceptionFactory.java:29)
>  at 
> org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:58)
>  at 
> org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
> ...
>  Suppressed: java.lang.InterruptedException: sleep interrupted
>  at java.lang.Thread.sleep(Native Method)
>  at 
> org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.shouldWorkWithRebalance(SmokeTestDriverIntegrationTest.java:151)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
>  at 
> org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>  at 
> org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>  at 
> org.junit.jupiter.engine.extension.SameThreadTimeoutInvocation.proceed(SameThreadTimeoutInvocation.java:45)
>  ... 134 more
> ```
> The test appears to be timing out waiting for the SmokeTestClient to complete 
> its asynchronous close, and taking significantly longer to do so (600s 
> instead of 60s) than a typical local test execution time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[GitHub] [kafka] ableegoldman merged pull request #13156: KAFKA-14533: re-enable 'false' and disable the 'true' parameter of SmokeTestDriverIntegrationTest

2023-01-24 Thread via GitHub


ableegoldman merged PR #13156:
URL: https://github.com/apache/kafka/pull/13156


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #13156: KAFKA-14533: re-enable 'false' and disable the 'true' parameter of SmokeTestDriverIntegrationTest

2023-01-24 Thread via GitHub


ableegoldman commented on PR #13156:
URL: https://github.com/apache/kafka/pull/13156#issuecomment-1401529084

   cc @lucasbru @mjsax 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ableegoldman opened a new pull request, #13156: KAFKA-14533: re-enable 'false' and disable the 'true' parameter of SmokeTestDriverIntegrationTest

2023-01-24 Thread via GitHub


ableegoldman opened a new pull request, #13156:
URL: https://github.com/apache/kafka/pull/13156

   I immediately saw a failure with `stateUpdaterEnabled = true` after 
disabling the `false` parameter, which suggests the problem actually does lie 
in the state updater itself and not the act of parametrization of the test. To 
verify this theory, and help stabilize the 3.4 release branch, let's try one 
more test by swapping out the `true` build in favor of the `false` one. If the 
`listOffsets` requests stop failing and causing this integration test to hit 
the global timeout as is currently happening at such a high rate, then we have 
pretty good evidence pointing at the state updater and should be able to debug 
things more easily from there.
   
   After getting in a few builds to see whether the flakiness subsides, we 
should merge this PR to re-enable both parameters going forward: 
https://github.com/apache/kafka/pull/13155


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org