[jira] [Resolved] (KAFKA-15041) Source Connector auto topic creation fails when topic is deleted and brokers don't support auto topic creation

2024-05-22 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-15041.
---
Resolution: Won't Fix

For now, setting the config `producer.override.max.block.ms` at a connector 
config level  or `producer.max.block.ms` at a worker config level to a lower 
value should fix this value. The problem is that the default value for the 
above config is[ set to Long.MAX_VALUE 
|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L820]in
 the configs and when topics are deleted manually, there's really no signal 
that is received to indicate the same. We could add some heuristics like 
checking if a topic is present or not periodically and refreshing the cache, or 
check the source topic metrics to see if the records are just being buffered 
and not being sent but that's outside the scope of runtime.

> Source Connector auto topic creation fails when topic is deleted and brokers 
> don't support auto topic creation
> --
>
> Key: KAFKA-15041
> URL: https://issues.apache.org/jira/browse/KAFKA-15041
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>
> [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics]
>   allows the source connectors to create topics even when the broker doesn't 
> allow to do so. It does so by checking for every record if a topic needs to 
> be created 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L500.]
>  To not always keep checking for topic presence via admin topics, it also 
> maintains a cache of the topics that it has created and doesn't create those 
> anymore. This helps to create topics when brokers don't support automatic 
> topic creation.
> However, lets say the topic gets created initially and later on gets deleted 
> while the connector is still running and the brokers don't support automatic 
> topic creation. For such cases, the connector has cached the topic it has 
> already created and wouldn't recreate it because the cache never updates and 
> since the broker doesn't support topic creation, the logs would just be full 
> of messages like 
>  
> {code:java}
> Error while fetching metadata with correlation id 3260 : 
> {connect-test=UNKNOWN_TOPIC_OR_PARTITION}{code}
>  
> This can become a problem on environments where brokers don't allow topic 
> creation. We need a way to refresh the topics cache for such cases.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14401) Connector/Tasks reading offsets can get stuck if underneath WorkThread dies

2024-05-08 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-14401:
--
Description: 
When a connector or task tries to read the offsets from the offsets topic, it 
issues `OffsetStorageImpl#offsets` method. This method gets a Future from the 
underneath KafkaBackingStore. KafkaBackingStore invokes 
`KafkaBasedLog#readToEnd` method and passes the Callback. This method 
essentially adds the Callback to a Queue of callbacks that are being managed.

Within KafkaBasedLog, there's a WorkThread which keeps polling over the 
callback queue and executes them and it does this in an infinite loop. However, 
there is an enclosing try/catch block around the while loop. If there's an 
exception thrown which is not caught by any of the other catch blocks, the 
control goes to the outermost catch block and the WorkThread is terminated. 
However, the connectors/tasks are not aware of this and they would keep 
submitting callbacks to KafkaBasedLog with nobody processing them. This can be 
seen in the thread dumps as well:

 
{code:java}
"task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s 
tid=0x7f8d9c037000 nid=0x5d00 waiting on condition  [0x7f8dc08cd000]
   java.lang.Thread.State: WAITING (parking)
    at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method)
    - parking to wait for  <0x00070345c9a8> (a 
java.util.concurrent.CountDownLatch$Sync)
    at 
java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345)
    at 
java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232)
    at 
org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98)
    at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101)
    at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
 {code}
 

We need a mechanism to fail all such offset read requests. That is because even 
if we restart the thread, chances are it will still fail with the same error so 
the offset fetch would be stuck perennially.

As already explained, this scenario happens mainly when the exception thrown is 
such that it isn't caught by any of the catch blocks and the control lands up 
in the outermost catch block. In my experience, I have seen this situation 
happening on a few occasions, when the exception thrown is:

 
 
{code:java}
[2022-11-20 09:00:59,307] ERROR Unexpected exception in Thread[KafkaBasedLog 
Work Thread - connect-offsets,5,main] 
(org.apache.kafka.connect.util.KafkaBasedLog:440)org.apache.kafka.connect.errors.ConnectException:
 Error while getting end offsets for topic 'connect-offsets' on brokers at XXX
  at 
org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:695)  
  at 
org.apache.kafka.connect.util.KafkaBasedLog.readEndOffsets(KafkaBasedLog.java:371)
  
  at 
org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:332)
  
  at 
org.apache.kafka.connect.util.KafkaBasedLog.access$400(KafkaBasedLog.java:75)   
   
  at 
org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:406)
Caused by: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
  at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
  at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
  at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
  at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
  at 
org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:672)
  ... 4 more
{code}
At this point, the WorkThread is dead once the control goes out of the [catch 
block|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L608-L610]

and we can find the following line `Unexpected exception in` in the logs.

Another example could be when the worker is already OOM and in such cases as 
well the work thread would die. This is not a good example because once the 
worker is OOM, we can't make any progress anyways but adding this example for 
brevity's sake.

  was:
When a connector or task tries to read the offsets from the 

[jira] [Updated] (KAFKA-14401) Connector/Tasks reading offsets can get stuck if underneath WorkThread dies

2024-05-08 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-14401:
--
Description: 
When a connector or task tries to read the offsets from the offsets topic, it 
issues `OffsetStorageImpl#offsets` method. This method gets a Future from the 
underneath KafkaBackingStore. KafkaBackingStore invokes 
`KafkaBasedLog#readToEnd` method and passes the Callback. This method 
essentially adds the Callback to a Queue of callbacks that are being managed.

Within KafkaBasedLog, there's a WorkThread which keeps polling over the 
callback queue and executes them and it does this in an infinite loop. However, 
there is an enclosing try/catch block around the while loop. If there's an 
exception thrown which is not caught by any of the other catch blocks, the 
control goes to the outermost catch block and the WorkThread is terminated. 
However, the connectors/tasks are not aware of this and they would keep 
submitting callbacks to KafkaBasedLog with nobody processing them. This can be 
seen in the thread dumps as well:

 
{code:java}
"task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s 
tid=0x7f8d9c037000 nid=0x5d00 waiting on condition  [0x7f8dc08cd000]
   java.lang.Thread.State: WAITING (parking)
    at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method)
    - parking to wait for  <0x00070345c9a8> (a 
java.util.concurrent.CountDownLatch$Sync)
    at 
java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345)
    at 
java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232)
    at 
org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98)
    at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101)
    at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
 {code}
 

We need a mechanism to fail all such offset read requests. That is because even 
if we restart the thread, chances are it will still fail with the same error so 
the offset fetch would be stuck perennially.

As already explained, this scenario happens mainly when the exception thrown is 
such that it isn't caught by any of the catch blocks and the control lands up 
in the outermost catch block. In my experience, I have seen this situation 
happening on a few occasions, when the exception thrown is:

 
 
{code:java}
Caused by: java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
  at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
  at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
  at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
  at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
  at 
org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:672)
  ... 4 more
{code}
At this point, the WorkThread is dead once the control goes out of the [catch 
block|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L608-L610]

and we can find the following line `Unexpected exception in` in the logs.

Another example could be when the worker is already OOM and in such cases as 
well the work thread would die. This is not a good example because once the 
worker is OOM, we can't make any progress anyways but adding this example for 
brevity's sake.

  was:
When a connector or task tries to read the offsets from the offsets topic, it 
issues `OffsetStorageImpl#offsets` method. This method gets a Future from the 
underneath KafkaBackingStore. KafkaBackingStore invokes 
`KafkaBasedLog#readToEnd` method and passes the Callback. This method 
essentially adds the Callback to a Queue of callbacks that are being managed.

Within KafkaBasedLog, there's a WorkThread which keeps polling over the 
callback queue and executes them and it does this in an infinite loop. However, 
there is an enclosing try/catch block around the while loop. If there's an 
exception thrown which is not caught by any of the other catch blocks, the 
control goes to the outermost catch block and the WorkThread is terminated. 
However, the connectors/tasks are not aware of this and they would keep 
submitting 

[jira] [Commented] (KAFKA-16604) Deprecate ConfigDef.ConfigKey constructor from public APIs

2024-04-23 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840163#comment-17840163
 ] 

Sagar Rao commented on KAFKA-16604:
---

[~chia7712] , ok i have assigned it to myself.

> Deprecate ConfigDef.ConfigKey constructor from public APIs
> --
>
> Key: KAFKA-16604
> URL: https://issues.apache.org/jira/browse/KAFKA-16604
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>
> Currently, one can create ConfigKey by either invoking the public constructor 
> directly and passing it to a ConfigDef object or by invoking the a bunch of 
> define methods. The 2 ways can get confusing at times. Moreover, it could 
> lead to errors as was noticed in KAFKA-16592
> We should ideally have only 1 way exposed to the users which IMO should be to 
> create the objects only through the exposed define methods. This ticket is 
> about marking the public constructor of ConfigKey as Deprecated first and 
> then making it private eventually.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16604) Deprecate ConfigDef.ConfigKey constructor from public APIs

2024-04-23 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-16604:
-

Assignee: Sagar Rao

> Deprecate ConfigDef.ConfigKey constructor from public APIs
> --
>
> Key: KAFKA-16604
> URL: https://issues.apache.org/jira/browse/KAFKA-16604
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>
> Currently, one can create ConfigKey by either invoking the public constructor 
> directly and passing it to a ConfigDef object or by invoking the a bunch of 
> define methods. The 2 ways can get confusing at times. Moreover, it could 
> lead to errors as was noticed in KAFKA-16592
> We should ideally have only 1 way exposed to the users which IMO should be to 
> create the objects only through the exposed define methods. This ticket is 
> about marking the public constructor of ConfigKey as Deprecated first and 
> then making it private eventually.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16604) Deprecate ConfigDef.ConfigKey constructor from public APIs

2024-04-23 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-16604:
-

 Summary: Deprecate ConfigDef.ConfigKey constructor from public APIs
 Key: KAFKA-16604
 URL: https://issues.apache.org/jira/browse/KAFKA-16604
 Project: Kafka
  Issue Type: Improvement
Reporter: Sagar Rao


Currently, one can create ConfigKey by either invoking the public constructor 
directly and passing it to a ConfigDef object or by invoking the a bunch of 
define methods. The 2 ways can get confusing at times. Moreover, it could lead 
to errors as was noticed in KAFKA-16592

We should ideally have only 1 way exposed to the users which IMO should be to 
create the objects only through the exposed define methods. This ticket is 
about marking the public constructor of ConfigKey as Deprecated first and then 
making it private eventually.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16592) ConfigKey constructor update can break clients using it

2024-04-20 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-16592:
-

 Summary: ConfigKey constructor update can break clients using it
 Key: KAFKA-16592
 URL: https://issues.apache.org/jira/browse/KAFKA-16592
 Project: Kafka
  Issue Type: Bug
Reporter: Sagar Rao
Assignee: Sagar Rao


In [KAFKA-14957|https://issues.apache.org/jira/browse/KAFKA-14957], the 
constructor of ConfigDef.ConfigKey was updated to add a new argument called 
{*}alternativeString{*}. As part of the PR, new *define* methods were also 
added which makes sense. However, since the constructor of 
*ConfigDef.ConfigKey* itself can be used directly by other clients which import 
the dependency, this can break all clients who were using the older constructor 
w/o the *alternativeString* argument. 

I bumped into this when I was testing 
the[kafka-connect-redis|[https://github.com/jcustenborder/kafka-connect-redis/tree/master]]
 connector. It starts up correctly against the official 3.7 release, but fails 
with the following error when run against a 3.8 snapshot

 

 
{code:java}
Caused by: java.lang.NoSuchMethodError: 
org.apache.kafka.common.config.ConfigDef$ConfigKey.(Ljava/lang/String;Lorg/apache/kafka/common/config/ConfigDef$Type;Ljava/lang/Object;Lorg/apache/kafka/common/config/ConfigDef$Validator;Lorg/apache/kafka/common/config/ConfigDef$Importance;Ljava/lang/String;Ljava/lang/String;ILorg/apache/kafka/common/config/ConfigDef$Width;Ljava/lang/String;Ljava/util/List;Lorg/apache/kafka/common/config/ConfigDef$Recommender;Z)V
 at 
com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder.build(ConfigKeyBuilder.java:62)
 at 
com.github.jcustenborder.kafka.connect.redis.RedisConnectorConfig.config(RedisConnectorConfig.java:133)
 at 
com.github.jcustenborder.kafka.connect.redis.RedisSinkConnectorConfig.config(RedisSinkConnectorConfig.java:46)
 at 
com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector.config(RedisSinkConnector.java:73)
 at 
org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:538)
 at 
org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$3(AbstractHerder.java:412)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 ... 1 more
 
{code}
 

The reason for that is that the connector uses another library called 
connect-utils which invokes the old constructor 
[directly|https://github.com/jcustenborder/connect-utils/blob/master/connect-utils/src/main/java/com/github/jcustenborder/kafka/connect/utils/config/ConfigKeyBuilder.java#L62]

It is not expected for connector invocations to fail across versions so this 
would cause confusion.

We could argue that why is the constructor being invoked directly instead of 
using the *define* method, but there might be other clients doing the same. We 
should add the old constructor back which calls the new one by setting the 
*alternativeString* to null.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16578) Revert changes to connect_distributed_test.py for the new async Consumer

2024-04-18 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838609#comment-17838609
 ] 

Sagar Rao edited comment on KAFKA-16578 at 4/18/24 3:02 PM:


[~kirktrue], I am running the the system tests for connect using the test suite 
on my local, and this exact test *test_exactly_once_source* fails regularly for 
me for a different config.
{code:java}

test_id:
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   7 minutes 59.232 seconds


InsufficientResourcesError('linux nodes requested: 1. linux nodes 
available: 0')
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
data = self.run_test()
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", 
line 929, in test_exactly_once_source
consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, 
self.source.topic, consumer_timeout_ms=1000, print_key=True, 
consumer_properties=consumer_properties)
  File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, 
in __init__
BackgroundThreadService.__init__(self, context, num_nodes)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py",
 line 26, in __init__
super(BackgroundThreadService, self).__init__(context, num_nodes, 
cluster_spec, *args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", 
line 107, in __init__
self.allocate_nodes()
  File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", 
line 217, in allocate_nodes
self.nodes = self.cluster.alloc(self.cluster_spec)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/cluster.py", 
line 54, in alloc
allocated = self.do_alloc(cluster_spec)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/cluster/finite_subcluster.py", 
line 37, in do_alloc
good_nodes, bad_nodes = self._available_nodes.remove_spec(cluster_spec)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/cluster/node_container.py", 
line 131, in remove_spec
raise InsufficientResourcesError(err)
ducktape.cluster.node_container.InsufficientResourcesError: linux nodes 
requested: 1. linux nodes available: 0
{code}
-If you want, I can revert the change as part of my PR. It should be ready for 
review by today or tomorrow.-

I updated the PR [https://github.com/apache/kafka/pull/15594] with the changes 
rolled back for the test in question. I hope it's ok.


was (Author: sagarrao):
[~kirktrue], I am running the the system tests for connect using the test suite 
on my local, and this exact test *test_exactly_once_source* fails regularly for 
me for a different config.
{code:java}

test_id:
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   7 minutes 59.232 seconds


InsufficientResourcesError('linux nodes requested: 1. linux nodes 
available: 0')
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
data = self.run_test()
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", 
line 929, in test_exactly_once_source
consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, 
self.source.topic, consumer_timeout_ms=1000, print_key=True, 
consumer_properties=consumer_properties)
  File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, 
in __init__
BackgroundThreadService.__init__(self, context, num_nodes)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py",
 line 26, in __init__
super(BackgroundThreadService, 

[jira] [Comment Edited] (KAFKA-16578) Revert changes to connect_distributed_test.py for the new async Consumer

2024-04-18 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838609#comment-17838609
 ] 

Sagar Rao edited comment on KAFKA-16578 at 4/18/24 11:11 AM:
-

[~kirktrue], I am running the the system tests for connect using the test suite 
on my local, and this exact test *test_exactly_once_source* fails regularly for 
me for a different config.
{code:java}

test_id:
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   7 minutes 59.232 seconds


InsufficientResourcesError('linux nodes requested: 1. linux nodes 
available: 0')
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
data = self.run_test()
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", 
line 929, in test_exactly_once_source
consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, 
self.source.topic, consumer_timeout_ms=1000, print_key=True, 
consumer_properties=consumer_properties)
  File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, 
in __init__
BackgroundThreadService.__init__(self, context, num_nodes)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py",
 line 26, in __init__
super(BackgroundThreadService, self).__init__(context, num_nodes, 
cluster_spec, *args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", 
line 107, in __init__
self.allocate_nodes()
  File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", 
line 217, in allocate_nodes
self.nodes = self.cluster.alloc(self.cluster_spec)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/cluster.py", 
line 54, in alloc
allocated = self.do_alloc(cluster_spec)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/cluster/finite_subcluster.py", 
line 37, in do_alloc
good_nodes, bad_nodes = self._available_nodes.remove_spec(cluster_spec)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/cluster/node_container.py", 
line 131, in remove_spec
raise InsufficientResourcesError(err)
ducktape.cluster.node_container.InsufficientResourcesError: linux nodes 
requested: 1. linux nodes available: 0
{code}
If you want, I can revert the change as part of my PR. It should be ready for 
review by today or tomorrow.


was (Author: sagarrao):
[~kirktrue], I am working on modifying the  the system tests for connect, and 
this exact test *test_exactly_once_source* fails regularly for me for a 
different config. 


{code:java}

test_id:
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   7 minutes 59.232 seconds


InsufficientResourcesError('linux nodes requested: 1. linux nodes 
available: 0')
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
data = self.run_test()
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", 
line 929, in test_exactly_once_source
consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, 
self.source.topic, consumer_timeout_ms=1000, print_key=True, 
consumer_properties=consumer_properties)
  File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, 
in __init__
BackgroundThreadService.__init__(self, context, num_nodes)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py",
 line 26, in __init__
super(BackgroundThreadService, self).__init__(context, num_nodes, 
cluster_spec, *args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", 
line 107, in 

[jira] [Comment Edited] (KAFKA-16578) Revert changes to connect_distributed_test.py for the new async Consumer

2024-04-18 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838609#comment-17838609
 ] 

Sagar Rao edited comment on KAFKA-16578 at 4/18/24 11:10 AM:
-

[~kirktrue], I am working on modifying the  the system tests for connect, and 
this exact test *test_exactly_once_source* fails regularly for me for a 
different config. 


{code:java}

test_id:
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   7 minutes 59.232 seconds


InsufficientResourcesError('linux nodes requested: 1. linux nodes 
available: 0')
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
data = self.run_test()
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", 
line 929, in test_exactly_once_source
consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, 
self.source.topic, consumer_timeout_ms=1000, print_key=True, 
consumer_properties=consumer_properties)
  File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, 
in __init__
BackgroundThreadService.__init__(self, context, num_nodes)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py",
 line 26, in __init__
super(BackgroundThreadService, self).__init__(context, num_nodes, 
cluster_spec, *args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", 
line 107, in __init__
self.allocate_nodes()
  File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", 
line 217, in allocate_nodes
self.nodes = self.cluster.alloc(self.cluster_spec)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/cluster.py", 
line 54, in alloc
allocated = self.do_alloc(cluster_spec)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/cluster/finite_subcluster.py", 
line 37, in do_alloc
good_nodes, bad_nodes = self._available_nodes.remove_spec(cluster_spec)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/cluster/node_container.py", 
line 131, in remove_spec
raise InsufficientResourcesError(err)
ducktape.cluster.node_container.InsufficientResourcesError: linux nodes 
requested: 1. linux nodes available: 0
{code}



If you want, I can revert the change as part of my PR. It should be ready for 
review by today or tomorrow.


was (Author: sagarrao):
[~kirktrue], I am working on modifying the  the system tests for connect, and 
this exact test `test_exactly_once_source` fails regularly for me for a 
different config. 
```

test_id:
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   7 minutes 59.232 seconds


InsufficientResourcesError('linux nodes requested: 1. linux nodes 
available: 0')
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
data = self.run_test()
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", 
line 929, in test_exactly_once_source
consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, 
self.source.topic, consumer_timeout_ms=1000, print_key=True, 
consumer_properties=consumer_properties)
  File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, 
in __init__
BackgroundThreadService.__init__(self, context, num_nodes)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py",
 line 26, in __init__
super(BackgroundThreadService, self).__init__(context, num_nodes, 
cluster_spec, *args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", 
line 107, in __init__

[jira] [Commented] (KAFKA-16578) Revert changes to connect_distributed_test.py for the new async Consumer

2024-04-18 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838609#comment-17838609
 ] 

Sagar Rao commented on KAFKA-16578:
---

[~kirktrue], I am working on modifying the  the system tests for connect, and 
this exact test `test_exactly_once_source` fails regularly for me for a 
different config. 
```

test_id:
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
status: FAIL
run time:   7 minutes 59.232 seconds


InsufficientResourcesError('linux nodes requested: 1. linux nodes 
available: 0')
Traceback (most recent call last):
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
186, in _do_run
data = self.run_test()
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 
246, in run_test
return self.test_context.function(self.test)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 
433, in wrapper
return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
  File 
"/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", 
line 929, in test_exactly_once_source
consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, 
self.source.topic, consumer_timeout_ms=1000, print_key=True, 
consumer_properties=consumer_properties)
  File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, 
in __init__
BackgroundThreadService.__init__(self, context, num_nodes)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py",
 line 26, in __init__
super(BackgroundThreadService, self).__init__(context, num_nodes, 
cluster_spec, *args, **kwargs)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", 
line 107, in __init__
self.allocate_nodes()
  File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", 
line 217, in allocate_nodes
self.nodes = self.cluster.alloc(self.cluster_spec)
  File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/cluster.py", 
line 54, in alloc
allocated = self.do_alloc(cluster_spec)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/cluster/finite_subcluster.py", 
line 37, in do_alloc
good_nodes, bad_nodes = self._available_nodes.remove_spec(cluster_spec)
  File 
"/usr/local/lib/python3.9/dist-packages/ducktape/cluster/node_container.py", 
line 131, in remove_spec
raise InsufficientResourcesError(err)
ducktape.cluster.node_container.InsufficientResourcesError: linux nodes 
requested: 1. linux nodes available: 0
```

If you want, I can revert the change as part of my PR. It should be ready for 
review by today or tomorrow.

> Revert changes to connect_distributed_test.py for the new async Consumer
> 
>
> Key: KAFKA-16578
> URL: https://issues.apache.org/jira/browse/KAFKA-16578
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer, system tests
>Affects Versions: 3.8.0
>Reporter: Kirk True
>Assignee: Kirk True
>Priority: Critical
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated 
> a slew of system tests to run both the "old" and "new" implementations. 
> KAFKA-16272 updated the system tests in {{connect_distributed_test.py}} so it 
> could test the new consumer with Connect. However, we are not supporting 
> Connect with the new consumer in 3.8.0. Unsurprisingly, when we run the 
> Connect system tests with the new {{AsyncKafkaConsumer}}, we get errors like 
> the following:
> {code}
> test_id:
> kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=eager.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer
> status: FAIL
> run time:   6 minutes 3.899 seconds
> InsufficientResourcesError('Not enough nodes available to allocate. linux 
> nodes requested: 1. linux nodes available: 0')
> Traceback (most recent call last):
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 184, in _do_run
> data = self.run_test()
>   File 
> "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py",
>  line 262, in run_test
> return self.test_context.function(self.test)
>   File 
> 

[jira] [Assigned] (KAFKA-16481) Fix flaky kafka.server.ReplicaManagerTest#testRemoteLogReaderMetrics

2024-04-06 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-16481:
-

Assignee: Sagar Rao

> Fix flaky kafka.server.ReplicaManagerTest#testRemoteLogReaderMetrics
> 
>
> Key: KAFKA-16481
> URL: https://issues.apache.org/jira/browse/KAFKA-16481
> Project: Kafka
>  Issue Type: Test
>Reporter: Chia-Ping Tsai
>Assignee: Sagar Rao
>Priority: Minor
>
> {quote}
> org.opentest4j.AssertionFailedError: expected:  but was: 
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>   at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>   at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>   at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>   at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31)
>   at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:183)
>   at 
> app//kafka.server.ReplicaManagerTest.testRemoteLogReaderMetrics(ReplicaManagerTest.scala:4089)
>   at 
> java.base@21.0.2/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
>   at java.base@21.0.2/java.lang.reflect.Method.invoke(Method.java:580)
>   at 
> app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728)
>   at 
> app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
>   at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
>   at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
>   at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
>   at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
>   at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
>   at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
>   at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218)
>   at 
> app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214)
>   at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139)
>   at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69)
>   at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
>   at 
> app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
>   at 
> app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
>   at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
>   at 
> app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
>   at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
>   at 
> app//org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
>   at 

[jira] [Commented] (KAFKA-16272) Update connect_distributed_test.py to support KIP-848’s group protocol config

2024-04-01 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832894#comment-17832894
 ] 

Sagar Rao commented on KAFKA-16272:
---

[~kirktrue], thanks I chnged the status to In Progress

> Update connect_distributed_test.py to support KIP-848’s group protocol config
> -
>
> Key: KAFKA-16272
> URL: https://issues.apache.org/jira/browse/KAFKA-16272
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Sagar Rao
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> This task is to update the test method(s) in {{connect_distributed_test.py}} 
> to support the {{group.protocol}} configuration introduced in 
> [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
>  by adding an optional {{group_protocol}} argument to the tests and matrixes.
> See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-16272) Update connect_distributed_test.py to support KIP-848’s group protocol config

2024-03-18 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-16272:
-

Assignee: Sagar Rao

> Update connect_distributed_test.py to support KIP-848’s group protocol config
> -
>
> Key: KAFKA-16272
> URL: https://issues.apache.org/jira/browse/KAFKA-16272
> Project: Kafka
>  Issue Type: Test
>  Components: clients, consumer, system tests
>Affects Versions: 3.7.0
>Reporter: Kirk True
>Assignee: Sagar Rao
>Priority: Blocker
>  Labels: kip-848-client-support, system-tests
> Fix For: 3.8.0
>
>
> This task is to update the test method(s) in {{connect_distributed_test.py}} 
> to support the {{group.protocol}} configuration introduced in 
> [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol]
>  by adding an optional {{group_protocol}} argument to the tests and matrixes.
> See KAFKA-16231 as an example of how the test parameters can be changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16197) Connect Worker poll timeout prints Consumer poll timeout specific warnings.

2024-01-26 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-16197:
--
Description: 
When a Connect worker's poll timeout expires in Connect, the log lines that we 
see are:

{noformat}
consumer poll timeout has expired. This means the time between subsequent calls 
to poll() was longer than the configured max.poll.interval.ms, which typically 
implies that the poll loop is spending too much time processing messages. You 
can address this either by increasing max.poll.interval.ms or by reducing the 
maximum size of batches returned in poll() with max.poll.records.

{noformat}

and the reason for leaving the group is 


{noformat}
Member XX sending LeaveGroup request to coordinator XX due to consumer poll 
timeout has expired.
{noformat}

which is specific to Consumers and not to Connect workers. The log line above 
in specially misleading because the config `max.poll.interval.ms` is not 
configurable for a Connect worker and could make someone believe that the logs 
are being written for Sink Connectors and not for Connect worker. Ideally, we 
should print something specific to Connect.


  was:
When a worker's poll timeout expires in Connect, the log lines that we see are:

{noformat}
consumer poll timeout has expired. This means the time between subsequent calls 
to poll() was longer than the configured max.poll.interval.ms, which typically 
implies that the poll loop is spending too much time processing messages. You 
can address this either by increasing max.poll.interval.ms or by reducing the 
maximum size of batches returned in poll() with max.poll.records.

{noformat}

and the reason for leaving the group is 


{noformat}
Member XX sending LeaveGroup request to coordinator XX due to consumer poll 
timeout has expired.
{noformat}

which is specific to Consumers and not to Connect workers. The log line above 
in specially misleading because the config `max.poll.interval.ms` is not 
configurable for a Connect worker and could make someone believe that the logs 
are being written for Sink Connectors and not for Connect worker. Ideally, we 
should print something specific to Connect.



> Connect Worker poll timeout prints Consumer poll timeout specific warnings.
> ---
>
> Key: KAFKA-16197
> URL: https://issues.apache.org/jira/browse/KAFKA-16197
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>
> When a Connect worker's poll timeout expires in Connect, the log lines that 
> we see are:
> {noformat}
> consumer poll timeout has expired. This means the time between subsequent 
> calls to poll() was longer than the configured max.poll.interval.ms, which 
> typically implies that the poll loop is spending too much time processing 
> messages. You can address this either by increasing max.poll.interval.ms or 
> by reducing the maximum size of batches returned in poll() with 
> max.poll.records.
> {noformat}
> and the reason for leaving the group is 
> {noformat}
> Member XX sending LeaveGroup request to coordinator XX due to consumer poll 
> timeout has expired.
> {noformat}
> which is specific to Consumers and not to Connect workers. The log line above 
> in specially misleading because the config `max.poll.interval.ms` is not 
> configurable for a Connect worker and could make someone believe that the 
> logs are being written for Sink Connectors and not for Connect worker. 
> Ideally, we should print something specific to Connect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16197) Connect Worker poll timeout prints Consumer poll timeout specific warnings.

2024-01-26 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-16197:
--
Component/s: connect

> Connect Worker poll timeout prints Consumer poll timeout specific warnings.
> ---
>
> Key: KAFKA-16197
> URL: https://issues.apache.org/jira/browse/KAFKA-16197
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>
> When a worker's poll timeout expires in Connect, the log lines that we see 
> are:
> {noformat}
> consumer poll timeout has expired. This means the time between subsequent 
> calls to poll() was longer than the configured max.poll.interval.ms, which 
> typically implies that the poll loop is spending too much time processing 
> messages. You can address this either by increasing max.poll.interval.ms or 
> by reducing the maximum size of batches returned in poll() with 
> max.poll.records.
> {noformat}
> and the reason for leaving the group is 
> {noformat}
> Member XX sending LeaveGroup request to coordinator XX due to consumer poll 
> timeout has expired.
> {noformat}
> which is specific to Consumers and not to Connect workers. The log line above 
> in specially misleading because the config `max.poll.interval.ms` is not 
> configurable for a Connect worker and could make someone believe that the 
> logs are being written for Sink Connectors and not for Connect worker. 
> Ideally, we should print something specific to Connect.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16197) Connect Worker poll timeout prints Consumer poll timeout specific warnings.

2024-01-26 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-16197:
-

 Summary: Connect Worker poll timeout prints Consumer poll timeout 
specific warnings.
 Key: KAFKA-16197
 URL: https://issues.apache.org/jira/browse/KAFKA-16197
 Project: Kafka
  Issue Type: Bug
Reporter: Sagar Rao
Assignee: Sagar Rao


When a worker's poll timeout expires in Connect, the log lines that we see are:

{noformat}
consumer poll timeout has expired. This means the time between subsequent calls 
to poll() was longer than the configured max.poll.interval.ms, which typically 
implies that the poll loop is spending too much time processing messages. You 
can address this either by increasing max.poll.interval.ms or by reducing the 
maximum size of batches returned in poll() with max.poll.records.

{noformat}

and the reason for leaving the group is 


{noformat}
Member XX sending LeaveGroup request to coordinator XX due to consumer poll 
timeout has expired.
{noformat}

which is specific to Consumers and not to Connect workers. The log line above 
in specially misleading because the config `max.poll.interval.ms` is not 
configurable for a Connect worker and could make someone believe that the logs 
are being written for Sink Connectors and not for Connect worker. Ideally, we 
should print something specific to Connect.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16039) RecordHeaders supports the addAll method

2023-12-30 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-16039:
--
Labels: need-kip  (was: )

> RecordHeaders supports the addAll method
> 
>
> Key: KAFKA-16039
> URL: https://issues.apache.org/jira/browse/KAFKA-16039
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Jianbin Chen
>Priority: Minor
>  Labels: need-kip
>
> Why not provide an addAll method in RecordHeaders? This will help reduce the 
> amount of code required to copy between headers



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16056) Worker poll timeout expiry can lead to Duplicate task assignments.

2023-12-27 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-16056:
-

 Summary: Worker poll timeout expiry can lead to Duplicate task 
assignments.
 Key: KAFKA-16056
 URL: https://issues.apache.org/jira/browse/KAFKA-16056
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Sagar Rao
Assignee: Sagar Rao


When a poll timeout expiry happens for a worker, it triggers a rebalance 
because it leaves the group pro-actively. Under normal scenarios, this leaving 
the group would trigger a scheduled rebalance delay. However, one thing to note 
is that, the worker which left the group temporarily, doesn't give up it's 
assignments and whatever tasks were running on it would remain as is. When the 
scheduled rebalance delay elapses, it would just get back it's assignments but 
given that there won't be any revocations, it should all work out fine.

But there is an edge case here. Let's assume that a scheduled rebalance delay 
was already active on a group and just before a follow up rebalance due to 
scheduled rebalance elapsing, one of the worker's poll timeout expires. At this 
point, a rebalance is imminent and the leader would track the assignments of 
the transiently departed worker as lost 
[here|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L255]
 . When 
[handleLostAssignments|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L441]
 gets triggered, because the scheduledRebalance delay isn't reset yet and if 
[this|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L473]
 condition passes, the leader would assume that it needs to reassign all the 
lost assignments which it will.

But because, the worker for which the poll timeout expired, doesn't rescind 
it's assignments we would end up noticing duplicate assignments- one set on the 
original worker which was already running the tasks and connectors and another 
set on the remaining group of workers which got the redistributed work. This 
could lead to task failures if connector has been written in a way which 
expects no duplicate tasks running across a cluster.

Also, this edge case can be encountered more frequently if the 
`rebalance.timeout.ms` config is set to a lower value. 

One of the approaches could be to do something similar to 
https://issues.apache.org/jira/browse/KAFKA-9184 where upon coordinator 
discovery failure, the worker gives up all it's assignments and joins with an 
empty assignment. We could do something similar in this case as well.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14454) KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions passes when run individ

2023-12-13 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796108#comment-17796108
 ] 

Sagar Rao commented on KAFKA-14454:
---

Aah ok, no problem at all :) Good to know that it worked and that we don't need 
to reopen this ticket.

> KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions
>  passes when run individually but not when is run as part of the IT
> --
>
> Key: KAFKA-14454
> URL: https://issues.apache.org/jira/browse/KAFKA-14454
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>
> Newly added test 
> KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions
>  as part of KIP-837 passes when run individually but fails when is part of IT 
> class and hence is marked as Ignored. 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14454) KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions passes when run individ

2023-12-13 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796095#comment-17796095
 ] 

Sagar Rao commented on KAFKA-14454:
---

[~cadonna], I don't recall the error and I tried to find the error but couldn't 
find it. I should have added the actual error as part of this ticket. We could 
reopen this but i didn't see this test failing on Jenkins as well. 

> KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions
>  passes when run individually but not when is run as part of the IT
> --
>
> Key: KAFKA-14454
> URL: https://issues.apache.org/jira/browse/KAFKA-14454
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>
> Newly added test 
> KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions
>  as part of KIP-837 passes when run individually but fails when is part of IT 
> class and hence is marked as Ignored. 
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10873) Inaccurate "Ignoring stop request for unowned connector" log messages

2023-11-18 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17787561#comment-17787561
 ] 

Sagar Rao commented on KAFKA-10873:
---

FWIW, we hit a scenario today where the same sequence of logs keep getting 
emitted.

> Inaccurate "Ignoring stop request for unowned connector" log messages
> -
>
> Key: KAFKA-10873
> URL: https://issues.apache.org/jira/browse/KAFKA-10873
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Priority: Major
>
> If a connector fails during startup, it will never be added to the worker's 
> internal list of running connectors (see 
> [https://github.com/apache/kafka/blob/87260a33b01590d0f73577840422e24fa589bed0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L298|https://github.com/apache/kafka/blob/87260a33b01590d0f73577840422e24fa589bed0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L298).]),
>  and the same happens for tasks (see 
> [https://github.com/apache/kafka/blob/87260a33b01590d0f73577840422e24fa589bed0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L570]).
>  
> This leads to the following {{WARN}}-level log messages when that 
> connector/task is scheduled to be stopped by the worker:
>  * [Ignoring stop request for unowned connector 
> |https://github.com/apache/kafka/blob/87260a33b01590d0f73577840422e24fa589bed0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L390]
>  * [Ignoring await stop request for non-present connector 
> |https://github.com/apache/kafka/blob/87260a33b01590d0f73577840422e24fa589bed0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L415]
>  * [Ignoring stop request for unowned task 
> |https://github.com/apache/kafka/blob/87260a33b01590d0f73577840422e24fa589bed0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L832]
>  * [Ignoring await stop request for non-present task 
> |https://github.com/apache/kafka/blob/87260a33b01590d0f73577840422e24fa589bed0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L862]
> If the connector/task failed on startup, there should already be log messages 
> detailing the failure and its cause; there is no need to emit warning 
> messages about not stopping that connector when it is scheduled for shutdown. 
> Even worse, emitting these messages may cause users to believe that their 
> cluster is experiencing a rebalancing bug that is somehow causing 
> connectors/tasks that are not assigned to a worker to be revoked from it.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15770) org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy is flaky

2023-11-16 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786693#comment-17786693
 ] 

Sagar Rao commented on KAFKA-15770:
---

Noticed an instance here: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14777/1/tests

> org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy
>  is flaky 
> ---
>
> Key: KAFKA-15770
> URL: https://issues.apache.org/jira/browse/KAFKA-15770
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Alok Thatikunta
>Priority: Major
>
> Test fails on CI, passes locally
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14607/9/testReport/junit/org.apache.kafka.streams.integration/ConsistencyVectorIntegrationTest/Build___JDK_17_and_Scala_2_13___shouldHaveSamePositionBoundActiveAndStandBy/]
> {code:java}
> java.lang.AssertionError: 
> Result:SucceededQueryResult{result=<0,1698511250443>, executionInfo=[], 
> position=Position{position={input-topic={0=50
> Expected: is 
>  but: was  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15687) Update host address for the GoupMetadata when replace static members

2023-10-26 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779863#comment-17779863
 ] 

Sagar Rao commented on KAFKA-15687:
---

Thanks for filing this bug [~LucentWong]. You have pointed to the correct piece 
of code and we aren't indeed updating the clientId. While it does seem like a 
confusing thing to see the same client-id even when a static member gets 
replaced, I believe the member id does changed upon replacement. Do you think 
that is sufficient for this case? Or does the client id hold any particular 
importance in your usecase and it is mandatory to change it?

> Update host address for the GoupMetadata when replace static members
> 
>
> Key: KAFKA-15687
> URL: https://issues.apache.org/jira/browse/KAFKA-15687
> Project: Kafka
>  Issue Type: Improvement
>  Components: group-coordinator
>Affects Versions: 3.6.0
>Reporter: Yu Wang
>Priority: Major
>
> We are trying to use static membership protocol for our consumers in 
> Kubernetes. When our pod was recreated, we found that the host address in the 
> group description will not change to the address of the new created pod.
> For example we have one pod with *group.instance.id = id1 and ip = 
> 192.168.0.1* when the pod crashes, we will replace it with a new pod with 
> same *group.instance.id = id1* but a different {*}ip = 192.168.0.2{*}. After 
> the new pod joined in the consumer group, with the command "describe group", 
> we found the host is still {*}192.168.0.1{*}. This makes us cannot find 
> correct consumer instance when check the issue.
> After read the source code, we found that the groupCoordinator will not 
> change the host address for the same {*}groupInstanceId{*}.
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L316]
> Is it also possible to replace the host address when replace the static 
> member?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with "{statestore.cache}/{input.buffer}.max.bytes"

2023-10-26 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779839#comment-17779839
 ] 

Sagar Rao commented on KAFKA-13152:
---

Thanks [~ableegoldman], there are merge conflicts which need resolution. I will 
ping you once those are resolved. It being 3rd in your pecking order of KIPs is 
fine with me. And yes, the config related changes have been merged and 
released. It's only the actual buffering part. 

> Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with 
> "{statestore.cache}/{input.buffer}.max.bytes"
> -
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: kip
> Fix For: 3.7.0
>
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-13976) Improvements for OpenAPI specs

2023-10-18 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-13976:
-

Assignee: (was: Sagar Rao)

> Improvements for OpenAPI specs
> --
>
> Key: KAFKA-13976
> URL: https://issues.apache.org/jira/browse/KAFKA-13976
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Mickael Maison
>Priority: Major
>
> Via KAFKA-13780, we're adding support for generating an OpenAPI spec for the 
> Connect REST API.
> To improve the spec we should:
> - Avoid returning Response and instead have types with a structure
> - Document all response codes



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15237) Implement write operation timeout

2023-10-16 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-15237:
-

Assignee: Sagar Rao  (was: David Jacot)

> Implement write operation timeout
> -
>
> Key: KAFKA-15237
> URL: https://issues.apache.org/jira/browse/KAFKA-15237
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Sagar Rao
>Priority: Major
>
> In the scala code, we rely on `offsets.commit.timeout.ms` to bound all the 
> writes. We should do the same in the new code. This is important to ensure 
> that the number of pending response in the purgatory is bound. The name of 
> the config is not ideal but we should keep it for backward compatibility 
> reasons.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15454) Add support for OffsetCommit version 9 in admin client

2023-10-16 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-15454:
-

Assignee: Sagar Rao

> Add support for OffsetCommit version 9 in admin client
> --
>
> Key: KAFKA-15454
> URL: https://issues.apache.org/jira/browse/KAFKA-15454
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: David Jacot
>Assignee: Sagar Rao
>Priority: Minor
>  Labels: kip-848, kip-848-client-support, kip-848-preview
>
> We need to handle the new error codes as specified here:
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/OffsetCommitResponse.json#L46|https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/OffsetCommitRequest.json#L35]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15457) Add support for OffsetFetch version 9 in admin

2023-10-16 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-15457:
-

Assignee: Sagar Rao

> Add support for OffsetFetch version 9 in admin
> --
>
> Key: KAFKA-15457
> URL: https://issues.apache.org/jira/browse/KAFKA-15457
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: David Jacot
>Assignee: Sagar Rao
>Priority: Minor
>  Labels: kip-848, kip-848-client-support, kip-848-preview
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with "{statestore.cache}/{input.buffer}.max.bytes"

2023-10-16 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775716#comment-17775716
 ] 

Sagar Rao commented on KAFKA-13152:
---

Makes sense Stanislav! If we can have experts from Streams to review this, I 
would be more than happy to revive this. 

> Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with 
> "{statestore.cache}/{input.buffer}.max.bytes"
> -
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: kip
> Fix For: 3.7.0
>
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with "{statestore.cache}/{input.buffer}.max.bytes"

2023-10-16 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775701#comment-17775701
 ] 

Sagar Rao commented on KAFKA-13152:
---

[~enether], that's correct.. the reviews are pending for this. I saw the PR and 
there are merge conflicts as well. Will need to resolve those..

> Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with 
> "{statestore.cache}/{input.buffer}.max.bytes"
> -
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: kip
> Fix For: 3.7.0
>
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-13152) Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with "{statestore.cache}/{input.buffer}.max.bytes"

2023-10-16 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775701#comment-17775701
 ] 

Sagar Rao edited comment on KAFKA-13152 at 10/16/23 11:15 AM:
--

[~enether], that's correct.. the reviews are pending for this. I saw the PR and 
there are merge conflicts as well. I Will need to resolve those..


was (Author: sagarrao):
[~enether], that's correct.. the reviews are pending for this. I saw the PR and 
there are merge conflicts as well. Will need to resolve those..

> Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with 
> "{statestore.cache}/{input.buffer}.max.bytes"
> -
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: kip
> Fix For: 3.7.0
>
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15020) integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor test is flaky

2023-10-05 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17772105#comment-17772105
 ] 

Sagar Rao commented on KAFKA-15020:
---

I found the original stacktrace is showing up in one of the recent builds: 
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14432/8/tests/

> integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor
>  test is flaky
> --
>
> Key: KAFKA-15020
> URL: https://issues.apache.org/jira/browse/KAFKA-15020
> Project: Kafka
>  Issue Type: Test
>  Components: unit tests
>Reporter: Atul Sharma
>Priority: Major
>  Labels: flaky-test
>
> Sometimes the test fails with the following log:
> {code:java}
> Gradle Test Run :core:integrationTest > Gradle Test Executor 175 > 
> FetchFromFollowerIntegrationTest > testRackAwareRangeAssignor() FAILED
> org.opentest4j.AssertionFailedError: Consumed 0 records before timeout 
> instead of the expected 2 records
> at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
> at org.junit.jupiter.api.Assertions.fail(Assertions.java:135)
> at kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:1087)
> at 
> integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$11(FetchFromFollowerIntegrationTest.scala:216)
> at 
> integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$11$adapted(FetchFromFollowerIntegrationTest.scala:215)
> at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> at 
> integration.kafka.server.FetchFromFollowerIntegrationTest.verifyAssignments$1(FetchFromFollowerIntegrationTest.scala:215)
>  at 
> integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor(FetchFromFollowerIntegrationTest.scala:244)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14516) Implement static membeship

2023-09-22 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-14516:
-

Assignee: Sagar Rao  (was: David Jacot)

> Implement static membeship
> --
>
> Key: KAFKA-14516
> URL: https://issues.apache.org/jira/browse/KAFKA-14516
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Sagar Rao
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14516) Implement static membeship

2023-09-22 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17767939#comment-17767939
 ] 

Sagar Rao commented on KAFKA-14516:
---

[~dajac], sure , thank you very much!

> Implement static membeship
> --
>
> Key: KAFKA-14516
> URL: https://issues.apache.org/jira/browse/KAFKA-14516
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14504) Implement DescribeGroups API

2023-09-22 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17767925#comment-17767925
 ] 

Sagar Rao commented on KAFKA-14504:
---

Sure, I would be happy to help :) 

> Implement DescribeGroups API
> 
>
> Key: KAFKA-14504
> URL: https://issues.apache.org/jira/browse/KAFKA-14504
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: Dongnuo Lyu
>Priority: Major
>
> Implement DescribeGroups API in the Group Coordinator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-14504) Implement DescribeGroups API

2023-09-22 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17767920#comment-17767920
 ] 

Sagar Rao edited comment on KAFKA-14504 at 9/22/23 9:23 AM:


[~dajac], is it still too urgent? I can take a stab at it if not and it's not 
already being worked upon.


was (Author: sagarrao):
[~dajac], is it still too urgent? I can take a stab at it if it's not already 
being worked upon.

> Implement DescribeGroups API
> 
>
> Key: KAFKA-14504
> URL: https://issues.apache.org/jira/browse/KAFKA-14504
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Priority: Major
>
> Implement DescribeGroups API in the Group Coordinator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14504) Implement DescribeGroups API

2023-09-22 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17767920#comment-17767920
 ] 

Sagar Rao commented on KAFKA-14504:
---

[~dajac], is it still too urgent? I can take a stab at it if it's not already 
being worked upon.

> Implement DescribeGroups API
> 
>
> Key: KAFKA-14504
> URL: https://issues.apache.org/jira/browse/KAFKA-14504
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Priority: Major
>
> Implement DescribeGroups API in the Group Coordinator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15460) Add group type filter to ListGroups API

2023-09-22 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17767900#comment-17767900
 ] 

Sagar Rao commented on KAFKA-15460:
---

[~zhaohaidao]are you currently working on this? 

> Add group type filter to ListGroups API
> ---
>
> Key: KAFKA-15460
> URL: https://issues.apache.org/jira/browse/KAFKA-15460
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: David Jacot
>Assignee: HaiyuanZhao
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15473) Connect connector-plugins endpoint shows duplicate plugins

2023-09-19 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17766738#comment-17766738
 ] 

Sagar Rao edited comment on KAFKA-15473 at 9/19/23 10:33 AM:
-

[~satish.duggana], No the API documentation doesn't mention anything about the 
presence/absence of duplicate entries. This is what it says:


{noformat}
GET /connector-plugins- return a list of connector plugins installed in the 
Kafka Connect cluster. Note that the API only checks for connectors on the 
worker that handles the request, which means you may see inconsistent results, 
especially during a rolling upgrade if you add new connector jars
{noformat}


I think the implicit assumption is that these would always return unique values 
but as Greg pointed out above, even pre-3.6 there could be cases in which this 
end point can return duplicate entries. Keeping that in mind, IMO this needn't 
be a release blocker and we can document it as you suggested. 


was (Author: sagarrao):
[~satish.duggana], No the API documentation doesn't mention anything about the 
presence/absence of duplicate entries. This is what it says:

??GET /connector-plugins- return a list of connector plugins installed in the 
Kafka Connect cluster. Note that the API only checks for connectors on the 
worker that handles the request, which means you may see inconsistent results, 
especially during a rolling upgrade if you add new connector jars
??
I think the implicit assumption is that these would always return unique values 
but as Greg pointed out above, even pre-3.6 there could be cases in which this 
end point can return duplicate entries. Keeping that in mind, IMO this needn't 
be a release blocker and we can document it as you suggested. 

> Connect connector-plugins endpoint shows duplicate plugins
> --
>
> Key: KAFKA-15473
> URL: https://issues.apache.org/jira/browse/KAFKA-15473
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.6.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
> Fix For: 3.6.0
>
>
> In <3.6.0-rc0, only one copy of each plugin would be shown. For example:
> {noformat}
>   {
> "class": "org.apache.kafka.connect.storage.StringConverter",
> "type": "converter"
>   },{noformat}
> In 3.6.0-rc0, there are multiple listings for the same plugin. For example:
>  
> {noformat}
>   {
>     "class": "org.apache.kafka.connect.storage.StringConverter",
>     "type": "converter"
>   },
>   {
>     "class": "org.apache.kafka.connect.storage.StringConverter",
>     "type": "converter"
>   },
>   {
>     "class": "org.apache.kafka.connect.storage.StringConverter",
>     "type": "converter"
>   },
>   {
>     "class": "org.apache.kafka.connect.storage.StringConverter",
>     "type": "converter"
>   },
>   {
>     "class": "org.apache.kafka.connect.storage.StringConverter",
>     "type": "converter"
>   },
>   {
>     "class": "org.apache.kafka.connect.storage.StringConverter",
>     "type": "converter",
>     "version": "3.6.0"
>   },{noformat}
> These duplicates appear to happen when a plugin with the same class name 
> appears in multiple locations/classloaders.
> When interpreting a connector configuration, only one of these plugins will 
> be chosen, so only one is relevant to show to users. The REST API should only 
> display the plugins which are eligible to be loaded, and hide the duplicates.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15473) Connect connector-plugins endpoint shows duplicate plugins

2023-09-19 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17766738#comment-17766738
 ] 

Sagar Rao commented on KAFKA-15473:
---

[~satish.duggana], No the API documentation doesn't mention anything about the 
presence/absence of duplicate entries. This is what it says:

??GET /connector-plugins- return a list of connector plugins installed in the 
Kafka Connect cluster. Note that the API only checks for connectors on the 
worker that handles the request, which means you may see inconsistent results, 
especially during a rolling upgrade if you add new connector jars
??
I think the implicit assumption is that these would always return unique values 
but as Greg pointed out above, even pre-3.6 there could be cases in which this 
end point can return duplicate entries. Keeping that in mind, IMO this needn't 
be a release blocker and we can document it as you suggested. 

> Connect connector-plugins endpoint shows duplicate plugins
> --
>
> Key: KAFKA-15473
> URL: https://issues.apache.org/jira/browse/KAFKA-15473
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.6.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Major
> Fix For: 3.6.0
>
>
> In <3.6.0-rc0, only one copy of each plugin would be shown. For example:
> {noformat}
>   {
> "class": "org.apache.kafka.connect.storage.StringConverter",
> "type": "converter"
>   },{noformat}
> In 3.6.0-rc0, there are multiple listings for the same plugin. For example:
>  
> {noformat}
>   {
>     "class": "org.apache.kafka.connect.storage.StringConverter",
>     "type": "converter"
>   },
>   {
>     "class": "org.apache.kafka.connect.storage.StringConverter",
>     "type": "converter"
>   },
>   {
>     "class": "org.apache.kafka.connect.storage.StringConverter",
>     "type": "converter"
>   },
>   {
>     "class": "org.apache.kafka.connect.storage.StringConverter",
>     "type": "converter"
>   },
>   {
>     "class": "org.apache.kafka.connect.storage.StringConverter",
>     "type": "converter"
>   },
>   {
>     "class": "org.apache.kafka.connect.storage.StringConverter",
>     "type": "converter",
>     "version": "3.6.0"
>   },{noformat}
> These duplicates appear to happen when a plugin with the same class name 
> appears in multiple locations/classloaders.
> When interpreting a connector configuration, only one of these plugins will 
> be chosen, so only one is relevant to show to users. The REST API should only 
> display the plugins which are eligible to be loaded, and hide the duplicates.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] (KAFKA-14401) Connector/Tasks reading offsets can get stuck if underneath WorkThread dies

2023-09-12 Thread Sagar Rao (Jira)


[ https://issues.apache.org/jira/browse/KAFKA-14401 ]


Sagar Rao deleted comment on KAFKA-14401:
---

was (Author: sagarrao):
There are a couple of routes that can be taken here:

 

1) As discussed, we try to reinitiate the WorkThread in case the work thread 
dies due to the unhandled exception. This could be useful to have the offsets 
read making progress and the connectors don't realise it. Problem is if it's a 
fatal exception, we might still re-create the thread. This could be both a good 
thing and a bad thing tbh. Of-course we would still log the exception(already 
happening).

2) Extending the idea from pt#1, we can create a single threaded thread pool 
which would take care of recreating the thread if it dies. It has the same pros 
and cons as #1.

3) The other extreme could be to not accept any offsets request and  fail the 
worker somehow. 

> Connector/Tasks reading offsets can get stuck if underneath WorkThread dies
> ---
>
> Key: KAFKA-14401
> URL: https://issues.apache.org/jira/browse/KAFKA-14401
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>
> When a connector or task tries to read the offsets from the offsets topic, it 
> issues `OffsetStorageImpl#offsets` method. This method gets a Future from the 
> underneath KafkaBackingStore. KafkaBackingStore invokes 
> `KafkaBasedLog#readToEnd` method and passes the Callback. This method 
> essentially adds the Callback to a Queue of callbacks that are being managed.
> Within KafkaBasedLog, there's a WorkThread which keeps polling over the 
> callback queue and executes them and it does this in an infinite loop. 
> However, there is an enclosing try/catch block around the while loop. If 
> there's an exception thrown which is not caught by any of the other catch 
> blocks, the control goes to the outermost catch block and the WorkThread is 
> terminated. However, the connectors/tasks are not aware of this and they 
> would keep submitting callbacks to KafkaBasedLog with nobody processing them. 
> This can be seen in the thread dumps as well:
>  
> {code:java}
> "task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s 
> tid=0x7f8d9c037000 nid=0x5d00 waiting on condition  [0x7f8dc08cd000]
>    java.lang.Thread.State: WAITING (parking)
>     at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method)
>     - parking to wait for  <0x00070345c9a8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>     at 
> java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345)
>     at 
> java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232)
>     at 
> org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98)
>     at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101)
>     at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
>  {code}
>  
> We need a mechanism to fail all such offset read requests. That is because 
> even if we restart the thread, chances are it will still fail with the same 
> error so the offset fetch would be stuck perennially.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14401) Connector/Tasks reading offsets can get stuck if underneath WorkThread dies

2023-09-12 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-14401:
-

Assignee: Sagar Rao  (was: Chaitanya Mukka)

> Connector/Tasks reading offsets can get stuck if underneath WorkThread dies
> ---
>
> Key: KAFKA-14401
> URL: https://issues.apache.org/jira/browse/KAFKA-14401
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>
> When a connector or task tries to read the offsets from the offsets topic, it 
> issues `OffsetStorageImpl#offsets` method. This method gets a Future from the 
> underneath KafkaBackingStore. KafkaBackingStore invokes 
> `KafkaBasedLog#readToEnd` method and passes the Callback. This method 
> essentially adds the Callback to a Queue of callbacks that are being managed.
> Within KafkaBasedLog, there's a WorkThread which keeps polling over the 
> callback queue and executes them and it does this in an infinite loop. 
> However, there is an enclosing try/catch block around the while loop. If 
> there's an exception thrown which is not caught by any of the other catch 
> blocks, the control goes to the outermost catch block and the WorkThread is 
> terminated. However, the connectors/tasks are not aware of this and they 
> would keep submitting callbacks to KafkaBasedLog with nobody processing them. 
> This can be seen in the thread dumps as well:
>  
> {code:java}
> "task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s 
> tid=0x7f8d9c037000 nid=0x5d00 waiting on condition  [0x7f8dc08cd000]
>    java.lang.Thread.State: WAITING (parking)
>     at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method)
>     - parking to wait for  <0x00070345c9a8> (a 
> java.util.concurrent.CountDownLatch$Sync)
>     at 
> java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345)
>     at 
> java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232)
>     at 
> org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98)
>     at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101)
>     at 
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
>  {code}
>  
> We need a mechanism to fail all such offset read requests. That is because 
> even if we restart the thread, chances are it will still fail with the same 
> error so the offset fetch would be stuck perennially.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14401) Connector/Tasks reading offsets can get stuck if underneath WorkThread dies

2023-09-12 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-14401:
--
Description: 
When a connector or task tries to read the offsets from the offsets topic, it 
issues `OffsetStorageImpl#offsets` method. This method gets a Future from the 
underneath KafkaBackingStore. KafkaBackingStore invokes 
`KafkaBasedLog#readToEnd` method and passes the Callback. This method 
essentially adds the Callback to a Queue of callbacks that are being managed.

Within KafkaBasedLog, there's a WorkThread which keeps polling over the 
callback queue and executes them and it does this in an infinite loop. However, 
there is an enclosing try/catch block around the while loop. If there's an 
exception thrown which is not caught by any of the other catch blocks, the 
control goes to the outermost catch block and the WorkThread is terminated. 
However, the connectors/tasks are not aware of this and they would keep 
submitting callbacks to KafkaBasedLog with nobody processing them. This can be 
seen in the thread dumps as well:

 
{code:java}
"task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s 
tid=0x7f8d9c037000 nid=0x5d00 waiting on condition  [0x7f8dc08cd000]
   java.lang.Thread.State: WAITING (parking)
    at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method)
    - parking to wait for  <0x00070345c9a8> (a 
java.util.concurrent.CountDownLatch$Sync)
    at 
java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345)
    at 
java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232)
    at 
org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98)
    at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101)
    at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
 {code}
 

We need a mechanism to fail all such offset read requests. That is because even 
if we restart the thread, chances are it will still fail with the same error so 
the offset fetch would be stuck perennially.

 

  was:
When a connector or task tries to read the offsets from the offsets topic, it 
issues `OffsetStorageImpl#offsets` method. This method gets a Future from the 
underneath KafkaBackingStore. KafkaBackingStore invokes 
`KafkaBasedLog#readToEnd` method and passes the Callback. This method 
essentially adds the Callback to a Queue of callbacks that are being managed.

Within KafkaBasedLog, there's a WorkThread which keeps polling over the 
callback queue and executes them and it does this in an infinite loop. However, 
there is an enclosing try/catch block around the while loop. If there's an 
exception thrown which is not caught by any of the other catch blocks, the 
control goes to the outermost catch block and the WorkThread is terminated. 
However, the connectors/tasks are not aware of this and they would keep 
submitting callbacks to KafkaBasedLog with nobody processing them. This can be 
seen in the thread dumps as well:

 
{code:java}
"task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s 
tid=0x7f8d9c037000 nid=0x5d00 waiting on condition  [0x7f8dc08cd000]
   java.lang.Thread.State: WAITING (parking)
    at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method)
    - parking to wait for  <0x00070345c9a8> (a 
java.util.concurrent.CountDownLatch$Sync)
    at 
java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345)
    at 
java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232)
    at 
org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98)
    at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101)
    at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
 {code}
 

We need a mechanism to restart the WorkThread if it dies. 

[jira] [Commented] (KAFKA-15408) Restart failed tasks in Kafka Connect up to a configurable max-tries

2023-08-28 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17759648#comment-17759648
 ] 

Sagar Rao commented on KAFKA-15408:
---

Hi, the steps are outline here: 
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals

> Restart failed tasks in Kafka Connect up to a configurable max-tries
> 
>
> Key: KAFKA-15408
> URL: https://issues.apache.org/jira/browse/KAFKA-15408
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Patrick Pang
>Priority: Major
>  Labels: needs-kip
>
> h2. Issue
> Currently, Kafka Connect just reports failed tasks on REST API, with the 
> error. Users are expected to monitor the status and restart individual 
> connectors if there is transient errors. Unfortunately these are common for 
> database connectors, e.g. transient connection error, flip of DNS, database 
> downtime, etc. Kafka Connect silently failing due to these scenarios would 
> lead to stale data downstream.
> h2. Proposal
> Kafka Connect should be able to restart failed tasks automatically, up to a 
> configurable max-tries.
> h2. Prior arts
>  * 
> [https://github.com/strimzi/proposals/blob/main/007-restarting-kafka-connect-connectors-and-tasks.md]
>  
>  * 
> [https://docs.aiven.io/docs/products/kafka/kafka-connect/howto/enable-automatic-restart]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15408) Restart failed tasks in Kafka Connect up to a configurable max-tries

2023-08-28 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-15408:
--
Labels: needs-kip  (was: )

> Restart failed tasks in Kafka Connect up to a configurable max-tries
> 
>
> Key: KAFKA-15408
> URL: https://issues.apache.org/jira/browse/KAFKA-15408
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Patrick Pang
>Priority: Major
>  Labels: needs-kip
>
> h2. Issue
> Currently, Kafka Connect just reports failed tasks on REST API, with the 
> error. Users are expected to monitor the status and restart individual 
> connectors if there is transient errors. Unfortunately these are common for 
> database connectors, e.g. transient connection error, flip of DNS, database 
> downtime, etc. Kafka Connect silently failing due to these scenarios would 
> lead to stale data downstream.
> h2. Proposal
> Kafka Connect should be able to restart failed tasks automatically, up to a 
> configurable max-tries.
> h2. Prior arts
>  * 
> [https://github.com/strimzi/proposals/blob/main/007-restarting-kafka-connect-connectors-and-tasks.md]
>  
>  * 
> [https://docs.aiven.io/docs/products/kafka/kafka-connect/howto/enable-automatic-restart]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15408) Restart failed tasks in Kafka Connect up to a configurable max-tries

2023-08-28 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17759529#comment-17759529
 ] 

Sagar Rao commented on KAFKA-15408:
---

[~patrickpang], thanks for filing this ! IMO, this is a feature which is long 
overdue on the Connect framework. Do you plan to pick this one up? I ask 
because if the answer is yes, we would need a KIP for this considering we might 
change some of the behaviour on how the status end point responses might not 
reflected task failure as soon as a task fails. Also, the configurable 
max-tries means the addition of a new config possibly. 

> Restart failed tasks in Kafka Connect up to a configurable max-tries
> 
>
> Key: KAFKA-15408
> URL: https://issues.apache.org/jira/browse/KAFKA-15408
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Patrick Pang
>Priority: Major
>
> h2. Issue
> Currently, Kafka Connect just reports failed tasks on REST API, with the 
> error. Users are expected to monitor the status and restart individual 
> connectors if there is transient errors. Unfortunately these are common for 
> database connectors, e.g. transient connection error, flip of DNS, database 
> downtime, etc. Kafka Connect silently failing due to these scenarios would 
> lead to stale data downstream.
> h2. Proposal
> Kafka Connect should be able to restart failed tasks automatically, up to a 
> configurable max-tries.
> h2. Prior arts
>  * 
> [https://github.com/strimzi/proposals/blob/main/007-restarting-kafka-connect-connectors-and-tasks.md]
>  
>  * 
> [https://docs.aiven.io/docs/products/kafka/kafka-connect/howto/enable-automatic-restart]
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14429) Move OffsetStorageReader from storage package to source package

2023-08-19 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17756308#comment-17756308
 ] 

Sagar Rao commented on KAFKA-14429:
---

hey [~gharris1727], given Chris's comment above, do you think we can close this 
one ?

> Move OffsetStorageReader from storage package to source package
> ---
>
> Key: KAFKA-14429
> URL: https://issues.apache.org/jira/browse/KAFKA-14429
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Greg Harris
>Priority: Minor
>  Labels: needs-kip
>
> The OffsetStorageReader is an interface provided to source connectors. This 
> does not fit with the broader context of the `storage` package, which is 
> focused on sink/source-agnostic converters and serialization/deserialization.
> The current interface should be deprecated and extend from the relocated 
> interface in a different package.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14138) The Exception Throwing Behavior of Transactional Producer is Inconsistent

2023-08-19 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-14138:
-

Assignee: (was: Sagar Rao)

> The Exception Throwing Behavior of Transactional Producer is Inconsistent
> -
>
> Key: KAFKA-14138
> URL: https://issues.apache.org/jira/browse/KAFKA-14138
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Guozhang Wang
>Priority: Critical
>
> There's an issue for inconsistent error throwing inside Kafka Producer when 
> transactions are enabled. In short, there are two places where the received 
> error code from the brokers would be eventually thrown to the caller:
> * Recorded on the batch's metadata, via "Sender#failBatch"
> * Recorded on the txn manager, via "txnManager#handleFailedBatch".
> The former would be thrown from 1) the `Future` returned from 
> the `send`; or 2) the `callback` inside `send(record, callback)`. Whereas, 
> the latter would be thrown from `producer.send()` directly in which we call 
> `txnManager.maybeAddPartition -> maybeFailWithError`. However, when thrown 
> from the former, it's not wrapped hence the direct exception (e.g. 
> ClusterAuthorizationException), whereas in the latter it's wrapped as, e.g. 
> KafkaException(ClusterAuthorizationException). And which one would be thrown 
> depend on a race condition since we cannot control by the time the caller 
> thread calls `txnManager.maybeAddPartition`, if the previous produceRequest's 
> error has been sent back or not.
> For example consider the following sequence for idempotent producer:
> 1. caller thread: within future = producer.send(), call 
> recordAccumulator.append
> 2. sender thread: drain the accumulator, send the produceRequest and get the 
> error back.
> 3. caller thread: within future = producer.send(), call 
> txnManager.maybeAddPartition, in which we would check `maybeFailWithError` 
> before `isTransactional`.
> 4. caller thread: future.get()
> In a sequence where then 3) happened before 2), we would only get the raw 
> exception at step 4; in a sequence where 2) happened before 3), then we would 
> throw the exception immediately at 3).
> This inconsistent error throwing is pretty annoying for users since they'd 
> need to handle both cases, but many of them actually do not know this 
> trickiness. We should make the error throwing consistent, e.g. we should 
> consider: 1) which errors would be thrown from callback / future.get, and 
> which would be thrown from the `send` call directly, and these errors should 
> better be non-overlapping, 2) whether we should wrap the raw error or not, we 
> should do so consistently.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-15339) Transient I/O error happening in appending records could lead to the halt of whole cluster

2023-08-18 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17755812#comment-17755812
 ] 

Sagar Rao edited comment on KAFKA-15339 at 8/18/23 5:52 AM:


Thanks for reporting this bug [~functioner]. As you correctly pointed out, 
there seems to be no retry mechanism while trying to append files to the topic 
which is backed by a file. From what I can see, whenever it fails, 
`LocalLog#maybeHandleIOException` is invoked which marks the given directory as 
offline which I believe triggers the rest of the errors that you see on the 
Connect cluster. While the suggestion you provided is a valid one i.e to retry 
in such cases, I couldn't find that pattern being used anywhere in this part of 
the code.
Having said that, have you noticed this error being thrown for versions < 3.5? 
The reason I ask that is that in v3.5, a migration of this class to the storage 
module was done. Here's the [link|https://github.com/apache/kafka/pull/13041] 
of the PR. AFAICS, there is not much difference and we should have experienced 
the same behaviour but was just curious. LMK. Or else, we can look at adding 
the retry mechanism but I am not sure how big/small that change would be 
considering these classes like UnifiedLog etc seem to be on a critical path.




was (Author: sagarrao):
Thanks for reporting this bug [~functioner]. As you correctly pointed out, 
there seems to be no retry mechanism while trying to append files to the topic 
which is backed by a file. From what I can see, whenever it fails, 
`LocalLog#maybeHandleIOException` is invoked which marks the given directory as 
offline which I believe triggers the rest of the errors that you see on the 
Connect cluster. While the suggestion you provided is a valid one i.e to retry 
in such cases, I couldn't find that pattern being used anywhere in this part of 
the code.
Having said that, have you noticed this error being thrown for versions < 3.5? 
The reason I ask that is that in v3.5, a migration of this class to the storage 
module was done. Here's the [link|https://github.com/apache/kafka/pull/13041] 
of the PR. AFAICS, there is not much difference and we should have experienced 
the same behaviour but was just curious. LMK. Or else, we can look at adding 
the retry mechanism



> Transient I/O error happening in appending records could lead to the halt of 
> whole cluster
> --
>
> Key: KAFKA-15339
> URL: https://issues.apache.org/jira/browse/KAFKA-15339
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect, producer 
>Affects Versions: 3.5.0
>Reporter: Haoze Wu
>Priority: Major
>
> We are running an integration test in which we start an Embedded Connect 
> Cluster in the active 3.5 branch. However, because of transient disk error, 
> we may encounter an IOException during appending records to one topic. As 
> shown in the stack trace: 
> {code:java}
> [2023-08-13 16:53:51,016] ERROR Error while appending records to 
> connect-config-topic-connect-cluster-0 in dir 
> /tmp/EmbeddedKafkaCluster8003464883598783225 
> (org.apache.kafka.storage.internals.log.LogDirFailureChannel:61)
> java.io.IOException: 
>         at 
> org.apache.kafka.common.record.MemoryRecords.writeFullyTo(MemoryRecords.java:92)
>         at 
> org.apache.kafka.common.record.FileRecords.append(FileRecords.java:188)
>         at kafka.log.LogSegment.append(LogSegment.scala:161)
>         at kafka.log.LocalLog.append(LocalLog.scala:436)
>         at kafka.log.UnifiedLog.append(UnifiedLog.scala:853)
>         at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:664)
>         at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1281)
>         at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1269)
>         at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:977)
>         at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
>         at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
>         at scala.collection.mutable.HashMap.map(HashMap.scala:35)
>         at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:965)
>         at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:623)
>         at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:680)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:180)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76)
>         at java.lang.Thread.run(Thread.java:748) {code}
> However, just because of failing to append the records to one partition. The 
> fetcher for all the other partitions are removed, broker shutdown, and 
> finally 

[jira] [Commented] (KAFKA-15339) Transient I/O error happening in appending records could lead to the halt of whole cluster

2023-08-18 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17755812#comment-17755812
 ] 

Sagar Rao commented on KAFKA-15339:
---

Thanks for reporting this bug [~functioner]. As you correctly pointed out, 
there seems to be no retry mechanism while trying to append files to the topic 
which is backed by a file. From what I can see, whenever it fails, 
`LocalLog#maybeHandleIOException` is invoked which marks the given directory as 
offline which I believe triggers the rest of the errors that you see on the 
Connect cluster. While the suggestion you provided is a valid one i.e to retry 
in such cases, I couldn't find that pattern being used anywhere in this part of 
the code.
Having said that, have you noticed this error being thrown for versions < 3.5? 
The reason I ask that is that in v3.5, a migration of this class to the storage 
module was done. Here's the [link|https://github.com/apache/kafka/pull/13041] 
of the PR. AFAICS, there is not much difference and we should have experienced 
the same behaviour but was just curious. LMK. Or else, we can look at adding 
the retry mechanism



> Transient I/O error happening in appending records could lead to the halt of 
> whole cluster
> --
>
> Key: KAFKA-15339
> URL: https://issues.apache.org/jira/browse/KAFKA-15339
> Project: Kafka
>  Issue Type: Improvement
>  Components: connect, producer 
>Affects Versions: 3.5.0
>Reporter: Haoze Wu
>Priority: Major
>
> We are running an integration test in which we start an Embedded Connect 
> Cluster in the active 3.5 branch. However, because of transient disk error, 
> we may encounter an IOException during appending records to one topic. As 
> shown in the stack trace: 
> {code:java}
> [2023-08-13 16:53:51,016] ERROR Error while appending records to 
> connect-config-topic-connect-cluster-0 in dir 
> /tmp/EmbeddedKafkaCluster8003464883598783225 
> (org.apache.kafka.storage.internals.log.LogDirFailureChannel:61)
> java.io.IOException: 
>         at 
> org.apache.kafka.common.record.MemoryRecords.writeFullyTo(MemoryRecords.java:92)
>         at 
> org.apache.kafka.common.record.FileRecords.append(FileRecords.java:188)
>         at kafka.log.LogSegment.append(LogSegment.scala:161)
>         at kafka.log.LocalLog.append(LocalLog.scala:436)
>         at kafka.log.UnifiedLog.append(UnifiedLog.scala:853)
>         at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:664)
>         at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1281)
>         at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1269)
>         at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:977)
>         at 
> scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
>         at 
> scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
>         at scala.collection.mutable.HashMap.map(HashMap.scala:35)
>         at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:965)
>         at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:623)
>         at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:680)
>         at kafka.server.KafkaApis.handle(KafkaApis.scala:180)
>         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76)
>         at java.lang.Thread.run(Thread.java:748) {code}
> However, just because of failing to append the records to one partition. The 
> fetcher for all the other partitions are removed, broker shutdown, and 
> finally embedded connect cluster killed as whole. 
> {code:java}
> [2023-08-13 17:35:37,966] WARN Stopping serving logs in dir 
> /tmp/EmbeddedKafkaCluster6777164631574762227 (kafka.log.LogManager:70)
> [2023-08-13 17:35:37,968] ERROR Shutdown broker because all log dirs in 
> /tmp/EmbeddedKafkaCluster6777164631574762227 have failed 
> (kafka.log.LogManager:143)
> [2023-08-13 17:35:37,968] WARN Abrupt service halt with code 1 and message 
> null (org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster:130)
> [2023-08-13 17:35:37,968] ERROR [LogDirFailureHandler]: Error due to 
> (kafka.server.ReplicaManager$LogDirFailureHandler:135)
> org.apache.kafka.connect.util.clusters.UngracefulShutdownException: Abrupt 
> service halt with code 1 and message null {code}
> I am wondering if we could add configurable retry around the root cause to 
> tolerate the possible I/O faults so that if the retry is successful, the 
> embedded connect cluster could still operate. 
> Any comments and suggestions would be appreciated.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15354) Partition leader is not evenly distributed in kraft mode

2023-08-17 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17755805#comment-17755805
 ] 

Sagar Rao commented on KAFKA-15354:
---

[~dengziming], I took a look at this. I believe this is happening because when 
we are trying to find the first replica of a new partition, 
[here|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java#L362],
 we set the index back to 0 when the epochs don't match 
[here|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java#L190].

In the test case you supplied, when we are adding partition 2, the epoch known 
to the brokers in rack 1 is 1 but the new incoming epoch is 2. So, the index is 
reset back to 0. I think that's why in this round as well we see broker 1 being 
assigned the leader. WDYT?

> Partition leader is not evenly distributed in kraft mode
> 
>
> Key: KAFKA-15354
> URL: https://issues.apache.org/jira/browse/KAFKA-15354
> Project: Kafka
>  Issue Type: Bug
>Reporter: Deng Ziming
>Priority: Major
>
> In StripedReplicaPlacerTest, we can create a test below to reproduce this bug.
> {code:java}
> // code placeholder
> @Test
> public void testReplicaDistribution() {
> MockRandom random = new MockRandom();
> StripedReplicaPlacer placer = new StripedReplicaPlacer(random);
> TopicAssignment assignment = place(placer, 0, 4, (short) 2, Arrays.asList(
> new UsableBroker(0, Optional.of("0"), false),
> new UsableBroker(1, Optional.of("0"), false),
> new UsableBroker(2, Optional.of("1"), false),
> new UsableBroker(3, Optional.of("1"), false)));
> System.out.println(assignment);
> } {code}
> In StripedReplicaPlacer, we only ensure leader are distributed evenly across 
> racks, but we didn't ensure leader are evenly distributed across nodes. in 
> the test above, we have 4 node: 1 2 3 4, and create 4 partitions but the 
> leaders are  1 2 1 2. while in zk mode, this is ensured, see 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15334) DescribeQuorum should not be seen as cluster action

2023-08-12 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17753575#comment-17753575
 ] 

Sagar Rao commented on KAFKA-15334:
---

hey [~dengziming] if it isn't too critical, can I take a stab at it? I am new 
to this part of the code but when I took a quick glance, it appears to me that  
we need to do something similar to what is being done currently for 
`handleElectLeaders` within ControllerApis. 

> DescribeQuorum should not be seen as cluster action 
> 
>
> Key: KAFKA-15334
> URL: https://issues.apache.org/jira/browse/KAFKA-15334
> Project: Kafka
>  Issue Type: Bug
>Reporter: Deng Ziming
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15296) Allow committing offsets for Dropped records via SMTs

2023-08-02 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-15296:
-

 Summary: Allow committing offsets for Dropped records via SMTs
 Key: KAFKA-15296
 URL: https://issues.apache.org/jira/browse/KAFKA-15296
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Sagar Rao
Assignee: Sagar Rao


Currently the connect Runtime doesn't commit the offsets of records which have 
been dropped due to SMT. This can lead to issues if the dropped record's 
partition reflects a source partition and the connector depends upon the 
committed offsets to make progress. In such cases, the connector might just 
stall. We should enable committing offsets for dropped records as well. Note 
that today if a record is dropped because exactly-once support is enabled and 
the connector chose to abort the batch containing the record, then its offset 
is still committed. So there already exists a discrepancy in the way the 
runtime treats these dropped records.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15229) Increase default value of task.shutdown.graceful.timeout.ms

2023-08-01 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-15229:
--
Description: 
The Kafka Connect config [task.shutdown.graceful.timeout.ms. 
|https://kafka.apache.org/documentation/#connectconfigs_task.shutdown.graceful.timeout.ms]has
 a default value of 5s. As per it's definition:

 
{noformat}
Amount of time to wait for tasks to shutdown gracefully. This is the total 
amount of time, not per task. All task have shutdown triggered, then they are 
waited on sequentially.{noformat}

it is the total timeout for all tasks to shutdown. Also, if multiple tasks are 
to be shutdown then, they are waited upon sequentially. Now the default value 
of this config is ok for smaller clusters with less number of tasks, on a 
larger cluster because the timeout can elapse we will see a lot of messages of 
the form 



{noformat}
Graceful stop of task  failed.
{noformat}



In case of failure in graceful stop of tasks, the tasks are cancelled which 
means that they won't send out a status update. Once that happens there won't 
be any `UNASSIGNED` status message posted for that task. Let's say the task 
stop was triggered by a worker going down. If the cluster is configured to use 
Incremental Cooperative Assignor, then the task wouldn't be reassigned until 
scheduled.rebalance.delay.max.ms interval elapses. So, for that amount of 
duration, the task would show up with status RUNNING whenever it's status is 
queried for. This can be confusing for the users.

This problem can be exacerbated on cloud environments(like kubernetes pods) 
because there is a high chance that the running status would be associated with 
an older worker_id which doesn't even exist in the cluster anymore. 

While the net effect of all of this is not catastrophic i.e it won't lead to 
any processing delays  or loss of data but the status of the task would be off. 
And if there are fast rebalances happening under Incremental Cooperative 
Assignor, then that duration could be high as well. 

So, the proposal is to increase the default value to a higher value. I am 
thinking we can set it to 60s because as far as I can see, it doesn't interfere 
with any other timeout that we have. 

Also while users can set this config on their clusters, this is a low 
importance config and generally goes unnoticed. So I believe we should look to 
increase the default value specially for future users of the clusters who might 
not look at this config in detail.




  was:
The Kafka Connect config [task.shutdown.graceful.timeout.ms. 
|https://kafka.apache.org/documentation/#connectconfigs_task.shutdown.graceful.timeout.ms]has
 a default value of 5s. As per it's definition:

 
{noformat}
Amount of time to wait for tasks to shutdown gracefully. This is the total 
amount of time, not per task. All task have shutdown triggered, then they are 
waited on sequentially.{noformat}

it is the total timeout for all tasks to shutdown. Also, if multiple tasks are 
to be shutdown then, they are waited upon sequentially. Now the default value 
of this config is ok for smaller clusters with less number of tasks, on a 
larger cluster because the timeout can elapse we will see a lot of messages of 
the form 



{noformat}
Graceful stop of task  failed.
{noformat}



In case of failure in graceful stop of tasks, the tasks are cancelled which 
means that they won't send out a status update. Once that happens there won't 
be any `UNASSIGNED` status message posted for that task. Let's say the task 
stop was triggered by a worker going down. If the cluster is configured to use 
Incremental Cooperative Assignor, then the task wouldn't be reassigned until 
scheduled.rebalance.delay.max.ms interval elapses. So, for that amount of 
duration, the task would show up with status RUNNING whenever it's status is 
queried for. This can be confusing for the users.

This problem can be exacerbated on cloud environments(like kubernetes pods) 
because there is a high chance that the running status would be associated with 
an older worker_id which doesn't even exist in the cluster anymore. 

While the net effect of all of this is not catastrophic i.e it won't lead to 
any processing delays  or loss of data but the status of the task would be off. 
And if there are fast rebalances happening under Incremental Cooperative 
Assignor, then that duration could be high as well. 

So, the proposal is to increase the default value to a higher value. I am 
thinking we can set it to 60s because as far as I can see, it doesn't interfere 
with any other timeout that we have. 

Also while users can set this config on their clusters, this is a low 
importance config and generally goes unnoticed. So I believe we should look to 
increase the default value. 





> Increase default value of task.shutdown.graceful.timeout.ms
> ---
>
>   

[jira] [Updated] (KAFKA-15229) Increase default value of task.shutdown.graceful.timeout.ms

2023-08-01 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-15229:
--
Description: 
The Kafka Connect config [task.shutdown.graceful.timeout.ms. 
|https://kafka.apache.org/documentation/#connectconfigs_task.shutdown.graceful.timeout.ms]has
 a default value of 5s. As per it's definition:

 
{noformat}
Amount of time to wait for tasks to shutdown gracefully. This is the total 
amount of time, not per task. All task have shutdown triggered, then they are 
waited on sequentially.{noformat}

it is the total timeout for all tasks to shutdown. Also, if multiple tasks are 
to be shutdown then, they are waited upon sequentially. Now the default value 
of this config is ok for smaller clusters with less number of tasks, on a 
larger cluster because the timeout can elapse we will see a lot of messages of 
the form 



{noformat}
Graceful stop of task  failed.
{noformat}



In case of failure in graceful stop of tasks, the tasks are cancelled which 
means that they won't send out a status update. Once that happens there won't 
be any `UNASSIGNED` status message posted for that task. Let's say the task 
stop was triggered by a worker going down. If the cluster is configured to use 
Incremental Cooperative Assignor, then the task wouldn't be reassigned until 
scheduled.rebalance.delay.max.ms interval elapses. So, for that amount of 
duration, the task would show up with status RUNNING whenever it's status is 
queried for. This can be confusing for the users.

This problem can be exacerbated on cloud environments(like kubernetes pods) 
because there is a high chance that the running status would be associated with 
an older worker_id which doesn't even exist in the cluster anymore. 

While the net effect of all of this is not catastrophic i.e it won't lead to 
any processing delays  or loss of data but the status of the task would be off. 
And if there are fast rebalances happening under Incremental Cooperative 
Assignor, then that duration could be high as well. 

So, the proposal is to increase the default value to a higher value. I am 
thinking we can set it to 60s because as far as I can see, it doesn't interfere 
with any other timeout that we have. 

Also while users can set this config on their clusters, this is a low 
importance config and generally goes unnoticed. So I believe we should look to 
increase the default value. 




  was:
The Kafka Connect config [task.shutdown.graceful.timeout.ms. 
|https://kafka.apache.org/documentation/#connectconfigs_task.shutdown.graceful.timeout.ms]has
 a default value of 5s. As per it's definition:

 
{noformat}
Amount of time to wait for tasks to shutdown gracefully. This is the total 
amount of time, not per task. All task have shutdown triggered, then they are 
waited on sequentially.{noformat}

it is the total timeout for all tasks to shutdown. Also, if multiple tasks are 
to be shutdown then, they are waited upon sequentially. Now the default value 
of this config is ok for smaller clusters with less number of tasks, on a 
larger cluster because the timeout can elapse we will see a lot of messages of 
the form 



{noformat}
Graceful stop of task  failed.
{noformat}



In case of failure in graceful stop of tasks, the tasks are cancelled which 
means that they won't send out a status update. Once that happens there won't 
be any `UNASSIGNED` status message posted for that task. Let's say the task 
stop was triggered by a worker going down. If the cluster is configured to use 
Incremental Cooperative Assignor, then the task wouldn't be reassigned until 
scheduled.rebalance.delay.max.ms interval elapses. So, for that amount of 
duration, the task would show up with status RUNNING whenever it's status is 
queried for. This can be confusing for the users.

This problem can be exacerbated on cloud environments(like kubernetes pods) 
because there is a high chance that the running status would be associated with 
an older worker_id which doesn't even exist in the cluster anymore. 

While the net effect of all of this is not catastrophic i.e it won't lead to 
any processing delays  or loss of data but the status of the task would be off. 
And if there are fast rebalances happening under Incremental Cooperative 
Assignor, then that duration could be high as well. 

So, the proposal is to increase the default value to a higher value. I am 
thinking we can set it to 60s because as far as I can see, it doesn't interfere 
with any other timeout that we have. 

Also while users can set this config on their clusters, this is a low 
importance config and generally goes unnoticed. So I believe we should look to 
increase the default value. 
I am tagging this as need-kip because I believe we will need one.





> Increase default value of task.shutdown.graceful.timeout.ms
> ---
>
> Key: 

[jira] [Updated] (KAFKA-15229) Increase default value of task.shutdown.graceful.timeout.ms

2023-08-01 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-15229:
--
Labels:   (was: needs-kip)

> Increase default value of task.shutdown.graceful.timeout.ms
> ---
>
> Key: KAFKA-15229
> URL: https://issues.apache.org/jira/browse/KAFKA-15229
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>
> The Kafka Connect config [task.shutdown.graceful.timeout.ms. 
> |https://kafka.apache.org/documentation/#connectconfigs_task.shutdown.graceful.timeout.ms]has
>  a default value of 5s. As per it's definition:
>  
> {noformat}
> Amount of time to wait for tasks to shutdown gracefully. This is the total 
> amount of time, not per task. All task have shutdown triggered, then they are 
> waited on sequentially.{noformat}
> it is the total timeout for all tasks to shutdown. Also, if multiple tasks 
> are to be shutdown then, they are waited upon sequentially. Now the default 
> value of this config is ok for smaller clusters with less number of tasks, on 
> a larger cluster because the timeout can elapse we will see a lot of messages 
> of the form 
> {noformat}
> Graceful stop of task  failed.
> {noformat}
> In case of failure in graceful stop of tasks, the tasks are cancelled which 
> means that they won't send out a status update. Once that happens there won't 
> be any `UNASSIGNED` status message posted for that task. Let's say the task 
> stop was triggered by a worker going down. If the cluster is configured to 
> use Incremental Cooperative Assignor, then the task wouldn't be reassigned 
> until scheduled.rebalance.delay.max.ms interval elapses. So, for that amount 
> of duration, the task would show up with status RUNNING whenever it's status 
> is queried for. This can be confusing for the users.
> This problem can be exacerbated on cloud environments(like kubernetes pods) 
> because there is a high chance that the running status would be associated 
> with an older worker_id which doesn't even exist in the cluster anymore. 
> While the net effect of all of this is not catastrophic i.e it won't lead to 
> any processing delays  or loss of data but the status of the task would be 
> off. And if there are fast rebalances happening under Incremental Cooperative 
> Assignor, then that duration could be high as well. 
> So, the proposal is to increase the default value to a higher value. I am 
> thinking we can set it to 60s because as far as I can see, it doesn't 
> interfere with any other timeout that we have. 
> Also while users can set this config on their clusters, this is a low 
> importance config and generally goes unnoticed. So I believe we should look 
> to increase the default value. 
> I am tagging this as need-kip because I believe we will need one.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10457) JsonConverter.toConnectData trims BigInteger to Long for schema-less case

2023-07-25 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746881#comment-17746881
 ] 

Sagar Rao commented on KAFKA-10457:
---

Thanks Yash. Yeah probably due to the workaround it hasn't received enough 
traction. Do you think we should wait for some other users to report similar 
errors (it's been around for a while though) or we should go ahead and add a 
new datatype? I see no harm in adding a new datatype though.

> JsonConverter.toConnectData trims BigInteger to Long for schema-less case
> -
>
> Key: KAFKA-10457
> URL: https://issues.apache.org/jira/browse/KAFKA-10457
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Oleksandr Diachenko
>Assignee: Oleksandr Diachenko
>Priority: Critical
>
>  
> When _JsonConverter_ is configured with _schemas.enable=false_ and value, 
> exceeding _Double_ is passed, the result is incorrect since the converter 
> trims it to _Double:_
> {code:java}
> Map props = Collections.singletonMap("schemas.enable", 
> false);
> converter.configure(props, true);
> BigInteger value = BigInteger.valueOf(Long.MAX_VALUE).add(new 
> BigInteger("1"));
> String msg = value.toString();
> SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, 
> msg.getBytes());
> assertNull(schemaAndValue.schema());
> assertEquals(value, schemaAndValue.value());
> {code}
>  
>  Fails with:
>  
> {code:java}
> expected:<9223372036854775808> but was:<-9223372036854775808>
> Expected :9223372036854775808
> Actual :-9223372036854775808
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15005) Status of KafkaConnect task not correct

2023-07-24 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746774#comment-17746774
 ] 

Sagar Rao commented on KAFKA-15005:
---

[~LucentWong], the PR for https://issues.apache.org/jira/browse/KAFKA-12525 has 
been merged. Closing this one. Thanks!

> Status of KafkaConnect task not correct
> ---
>
> Key: KAFKA-15005
> URL: https://issues.apache.org/jira/browse/KAFKA-15005
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1, 3.0.0, 3.3.2
>Reporter: Yu Wang
>Priority: Major
>
> Our MM2 is running version 2.5.1.
> After a rebalance of our MM2 source tasks, we found there were several tasks 
> always in *UNASSIGNED* status, even the real tasks already started. 
> So we dump the payload of the status topic of Kafka Connect, and found the 
> last two status change is status *RUNNING* followed by status 
> {*}UNASSIGNED{*}.
> {code:java}
> LogAppendTime: 1684167885961 keysize: 64 valuesize: 95 sequence: -1 
> headerKeys: [] key: task-7 payload: 
> {"state":"RUNNING","trace":null,"worker_id":"worker-1","generation":437643}
> LogAppendTime: 1684167903456 keysize: 64 valuesize: 98 sequence: -1 
> headerKeys: [] key: task-7 payload: 
> {"state":"UNASSIGNED","trace":null,"worker_id":"worker-2","generation":437643}
>  {code}
> But usually, the RUNNING status should be appended after the UNASSIGNED, 
> because the worker coordinator will revoked the tasks before start new tasks.
> Then we checked the log of our MM2 worker. And found that, during that time, 
> there was a task that revoked on worker-2 and started on worker-1.
>  
> Worker-1
> {code:java}
> [2023-05-15 09:24:45,950] INFO [Worker clientId=connect-1, 
> groupId=__group] Starting task task-7 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> [2023-05-15 09:24:45,951] INFO Creating task task-7 
> (org.apache.kafka.connect.runtime.Worker) {code}
> Worker-2
> {code:java}
> [2023-05-15 09:24:40,922] INFO Stopping task task-7 
> (org.apache.kafka.connect.runtime.Worker) {code}
>  
> So I think the incorrect status was caused by the revoked task finished later 
> than the new started task, which made the UNASSIGNED status append to that 
> status topic after the RUNNING status. 
>  
> After reading the code of DistributeHerder, I found that the task revoking is 
> running in a thread pool, the revoke operation just return after submit all 
> the callables. So I think even in the same worker, there is not a guarantee 
> that the revoke operation will always finish before the new tasks start.
> {code:java}
> for (final ConnectorTaskId taskId : tasks) {
> callables.add(getTaskStoppingCallable(taskId));
> }
> // The actual timeout for graceful task/connector stop is applied in worker's
> // stopAndAwaitTask/stopAndAwaitConnector methods.
> startAndStop(callables);
> log.info("Finished stopping tasks in preparation for rebalance"); {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15005) Status of KafkaConnect task not correct

2023-07-24 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-15005.
---
Resolution: Duplicate

> Status of KafkaConnect task not correct
> ---
>
> Key: KAFKA-15005
> URL: https://issues.apache.org/jira/browse/KAFKA-15005
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1, 3.0.0, 3.3.2
>Reporter: Yu Wang
>Priority: Major
>
> Our MM2 is running version 2.5.1.
> After a rebalance of our MM2 source tasks, we found there were several tasks 
> always in *UNASSIGNED* status, even the real tasks already started. 
> So we dump the payload of the status topic of Kafka Connect, and found the 
> last two status change is status *RUNNING* followed by status 
> {*}UNASSIGNED{*}.
> {code:java}
> LogAppendTime: 1684167885961 keysize: 64 valuesize: 95 sequence: -1 
> headerKeys: [] key: task-7 payload: 
> {"state":"RUNNING","trace":null,"worker_id":"worker-1","generation":437643}
> LogAppendTime: 1684167903456 keysize: 64 valuesize: 98 sequence: -1 
> headerKeys: [] key: task-7 payload: 
> {"state":"UNASSIGNED","trace":null,"worker_id":"worker-2","generation":437643}
>  {code}
> But usually, the RUNNING status should be appended after the UNASSIGNED, 
> because the worker coordinator will revoked the tasks before start new tasks.
> Then we checked the log of our MM2 worker. And found that, during that time, 
> there was a task that revoked on worker-2 and started on worker-1.
>  
> Worker-1
> {code:java}
> [2023-05-15 09:24:45,950] INFO [Worker clientId=connect-1, 
> groupId=__group] Starting task task-7 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> [2023-05-15 09:24:45,951] INFO Creating task task-7 
> (org.apache.kafka.connect.runtime.Worker) {code}
> Worker-2
> {code:java}
> [2023-05-15 09:24:40,922] INFO Stopping task task-7 
> (org.apache.kafka.connect.runtime.Worker) {code}
>  
> So I think the incorrect status was caused by the revoked task finished later 
> than the new started task, which made the UNASSIGNED status append to that 
> status topic after the RUNNING status. 
>  
> After reading the code of DistributeHerder, I found that the task revoking is 
> running in a thread pool, the revoke operation just return after submit all 
> the callables. So I think even in the same worker, there is not a guarantee 
> that the revoke operation will always finish before the new tasks start.
> {code:java}
> for (final ConnectorTaskId taskId : tasks) {
> callables.add(getTaskStoppingCallable(taskId));
> }
> // The actual timeout for graceful task/connector stop is applied in worker's
> // stopAndAwaitTask/stopAndAwaitConnector methods.
> startAndStop(callables);
> log.info("Finished stopping tasks in preparation for rebalance"); {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15239) producerPerformance system test for old client failed after v3.5.0

2023-07-24 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746253#comment-17746253
 ] 

Sagar Rao commented on KAFKA-15239:
---

[~showuon], I beleive we have this tool migration guideline : 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines.
 I also think some of the tool migration efforts are following these. Federico 
might have more inputs on this.

> producerPerformance system test for old client failed after v3.5.0
> --
>
> Key: KAFKA-15239
> URL: https://issues.apache.org/jira/browse/KAFKA-15239
> Project: Kafka
>  Issue Type: Test
>  Components: system tests
>Affects Versions: 3.6.0
>Reporter: Luke Chen
>Priority: Major
>
> While running producer performance tool in system test for old client (ex: 
> quota_test), we will try to run with the dev-branch's jar file, to make sure 
> it is backward compatible, as described 
> [here|https://github.com/apache/kafka/blob/trunk/tests/kafkatest/services/performance/producer_performance.py#L86-L88],.
> {code:java}
> # In order to ensure more consistent configuration between versions, always 
> use the ProducerPerformance tool from the development branch {code}
>  
> But in KAFKA-14525, we're moving tools from core module to a separate tool 
> module, we're actually breaking the backward compatibility. We should fix the 
> system test. Also maybe we should also mention anywhere about this backward 
> compatibility issue?
> Note:
> This is the command run in system test. Suppose it's testing old client 3.4.0 
> (file put under `~/Downloads/kafka_2.13-3.4.0` in my env), and running under 
> the latest trunk env.
> {code:java}
> > for file in ./tools/build/libs/kafka-tools*.jar; do 
> > CLASSPATH=$CLASSPATH:$file; done; for file in 
> > ./tools/build/dependant-libs*/*.jar; do CLASSPATH=$CLASSPATH:$file; done; 
> > export CLASSPATH;  export 
> > KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:config/tools-log4j.properties";
> >  KAFKA_OPTS= KAFKA_HEAP_OPTS="-XX:+HeapDumpOnOutOfMemoryError" 
> > ~/Downloads/kafka_2.13-3.4.0/bin/kafka-run-class.sh 
> > org.apache.kafka.tools.ProducerPerformance --topic test_topic --num-records 
> > 5 --record-size 3000 --throughput -1 --producer-props 
> > bootstrap.servers=localhost:9092 client.id=overridden_id 
> Exception in thread "main" java.lang.NoClassDefFoundError: 
> org/apache/kafka/common/utils/ThroughputThrottler
>     at 
> org.apache.kafka.tools.ProducerPerformance.start(ProducerPerformance.java:101)
>     at 
> org.apache.kafka.tools.ProducerPerformance.main(ProducerPerformance.java:52)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.kafka.common.utils.ThroughputThrottler
>     at 
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
>     at 
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
>     at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520)
>     ... 2 more
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-10457) JsonConverter.toConnectData trims BigInteger to Long for schema-less case

2023-07-23 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746198#comment-17746198
 ] 

Sagar Rao commented on KAFKA-10457:
---

[~yash.mayya], IIRC, you had worked on an issue related to data type 
conversions leading to issues. Does this look similar? You might want to take a 
stab at it since you already are aware of the issue. Let me know.

> JsonConverter.toConnectData trims BigInteger to Long for schema-less case
> -
>
> Key: KAFKA-10457
> URL: https://issues.apache.org/jira/browse/KAFKA-10457
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Oleksandr Diachenko
>Assignee: Oleksandr Diachenko
>Priority: Critical
>
>  
> When _JsonConverter_ is configured with _schemas.enable=false_ and value, 
> exceeding _Double_ is passed, the result is incorrect since the converter 
> trims it to _Double:_
> {code:java}
> Map props = Collections.singletonMap("schemas.enable", 
> false);
> converter.configure(props, true);
> BigInteger value = BigInteger.valueOf(Long.MAX_VALUE).add(new 
> BigInteger("1"));
> String msg = value.toString();
> SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, 
> msg.getBytes());
> assertNull(schemaAndValue.schema());
> assertEquals(value, schemaAndValue.value());
> {code}
>  
>  Fails with:
>  
> {code:java}
> expected:<9223372036854775808> but was:<-9223372036854775808>
> Expected :9223372036854775808
> Actual :-9223372036854775808
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-12283) Flaky Test RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining

2023-07-23 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746197#comment-17746197
 ] 

Sagar Rao edited comment on KAFKA-12283 at 7/24/23 5:02 AM:


The flakiness with this test was fixed as part of this change=> 
https://github.com/apache/kafka/pull/12561/files#diff-7c0517709aeed38eadffb16dc43b7fc3ba8a7e9b3069f3c5a5c423caf6473470L273.

We haven't seen flaky behaviour since this PR got merged. Closing. cc [~showuon]


was (Author: sagarrao):
The flakiness with this test was also fixed as part of this PR=> 
https://github.com/apache/kafka/pull/12561/files#diff-7c0517709aeed38eadffb16dc43b7fc3ba8a7e9b3069f3c5a5c423caf6473470L273.

Closing. cc [~showuon]

> Flaky Test 
> RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining
> 
>
> Key: KAFKA-12283
> URL: https://issues.apache.org/jira/browse/KAFKA-12283
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect, unit tests
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
>
> https://github.com/apache/kafka/pull/1/checks?check_run_id=1820092809
> {quote} {{java.lang.AssertionError: Tasks are imbalanced: 
> localhost:36037=[seq-source13-0, seq-source13-1, seq-source13-2, 
> seq-source13-3, seq-source12-0, seq-source12-1, seq-source12-2, 
> seq-source12-3]
> localhost:43563=[seq-source11-0, seq-source11-2, seq-source10-0, 
> seq-source10-2]
> localhost:46539=[seq-source11-1, seq-source11-3, seq-source10-1, 
> seq-source10-3]
>   at org.junit.Assert.fail(Assert.java:89)
>   at org.junit.Assert.assertTrue(Assert.java:42)
>   at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.assertConnectorAndTasksAreUniqueAndBalanced(RebalanceSourceConnectorsIntegrationTest.java:362)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290)
>   at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining(RebalanceSourceConnectorsIntegrationTest.java:313)}}
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-12283) Flaky Test RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining

2023-07-23 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-12283.
---
Resolution: Fixed

> Flaky Test 
> RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining
> 
>
> Key: KAFKA-12283
> URL: https://issues.apache.org/jira/browse/KAFKA-12283
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect, unit tests
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
>
> https://github.com/apache/kafka/pull/1/checks?check_run_id=1820092809
> {quote} {{java.lang.AssertionError: Tasks are imbalanced: 
> localhost:36037=[seq-source13-0, seq-source13-1, seq-source13-2, 
> seq-source13-3, seq-source12-0, seq-source12-1, seq-source12-2, 
> seq-source12-3]
> localhost:43563=[seq-source11-0, seq-source11-2, seq-source10-0, 
> seq-source10-2]
> localhost:46539=[seq-source11-1, seq-source11-3, seq-source10-1, 
> seq-source10-3]
>   at org.junit.Assert.fail(Assert.java:89)
>   at org.junit.Assert.assertTrue(Assert.java:42)
>   at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.assertConnectorAndTasksAreUniqueAndBalanced(RebalanceSourceConnectorsIntegrationTest.java:362)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290)
>   at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining(RebalanceSourceConnectorsIntegrationTest.java:313)}}
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-12283) Flaky Test RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining

2023-07-23 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746197#comment-17746197
 ] 

Sagar Rao commented on KAFKA-12283:
---

The flakiness with this test was also fixed as part of this PR=> 
https://github.com/apache/kafka/pull/12561/files#diff-7c0517709aeed38eadffb16dc43b7fc3ba8a7e9b3069f3c5a5c423caf6473470L273.

Closing. cc [~showuon]

> Flaky Test 
> RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining
> 
>
> Key: KAFKA-12283
> URL: https://issues.apache.org/jira/browse/KAFKA-12283
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect, unit tests
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
>
> https://github.com/apache/kafka/pull/1/checks?check_run_id=1820092809
> {quote} {{java.lang.AssertionError: Tasks are imbalanced: 
> localhost:36037=[seq-source13-0, seq-source13-1, seq-source13-2, 
> seq-source13-3, seq-source12-0, seq-source12-1, seq-source12-2, 
> seq-source12-3]
> localhost:43563=[seq-source11-0, seq-source11-2, seq-source10-0, 
> seq-source10-2]
> localhost:46539=[seq-source11-1, seq-source11-3, seq-source10-1, 
> seq-source10-3]
>   at org.junit.Assert.fail(Assert.java:89)
>   at org.junit.Assert.assertTrue(Assert.java:42)
>   at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.assertConnectorAndTasksAreUniqueAndBalanced(RebalanceSourceConnectorsIntegrationTest.java:362)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290)
>   at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining(RebalanceSourceConnectorsIntegrationTest.java:313)}}
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-8391) Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector

2023-07-23 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-8391.
--
Resolution: Fixed

Fixed with https://github.com/apache/kafka/pull/12561

> Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector
> ---
>
> Key: KAFKA-8391
> URL: https://issues.apache.org/jira/browse/KAFKA-8391
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Sagar Rao
>Priority: Critical
>  Labels: flaky-test
> Attachments: 100-gradle-builds.tar
>
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/4747/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testDeleteConnector/]
> {quote}java.lang.AssertionError: Condition not met within timeout 3. 
> Connector tasks did not stop in time. at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:375) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:352) at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector(RebalanceSourceConnectorsIntegrationTest.java:166){quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-8391) Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector

2023-07-23 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746195#comment-17746195
 ] 

Sagar Rao edited comment on KAFKA-8391 at 7/24/23 5:00 AM:
---

Just bumped upon this. The issue mentioned above 
(https://issues.apache.org/jira/browse/KAFKA-12495)was fixed last year and this 
specific test was enabled 
https://github.com/apache/kafka/pull/12561/files#diff-7c0517709aeed38eadffb16dc43b7fc3ba8a7e9b3069f3c5a5c423caf6473470L191.
 We haven't noticed flakiness around this test since then. Closing this one. cc 
[~showuon]


was (Author: sagarrao):
Just bumped upon this. The issue mentioned above 
(https://issues.apache.org/jira/browse/KAFKA-12495)was fixed last year and this 
specific test was enabled 
https://github.com/apache/kafka/pull/12561/files#diff-7c0517709aeed38eadffb16dc43b7fc3ba8a7e9b3069f3c5a5c423caf6473470L191.
 Closing this one. cc [~showuon]

> Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector
> ---
>
> Key: KAFKA-8391
> URL: https://issues.apache.org/jira/browse/KAFKA-8391
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Sagar Rao
>Priority: Critical
>  Labels: flaky-test
> Attachments: 100-gradle-builds.tar
>
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/4747/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testDeleteConnector/]
> {quote}java.lang.AssertionError: Condition not met within timeout 3. 
> Connector tasks did not stop in time. at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:375) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:352) at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector(RebalanceSourceConnectorsIntegrationTest.java:166){quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-8391) Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector

2023-07-23 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746195#comment-17746195
 ] 

Sagar Rao commented on KAFKA-8391:
--

Just bumped upon this. The issue mentioned above 
(https://issues.apache.org/jira/browse/KAFKA-12495)was fixed last year and this 
specific test was enabled 
https://github.com/apache/kafka/pull/12561/files#diff-7c0517709aeed38eadffb16dc43b7fc3ba8a7e9b3069f3c5a5c423caf6473470L191.
 Closing this one. cc [~showuon]

> Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector
> ---
>
> Key: KAFKA-8391
> URL: https://issues.apache.org/jira/browse/KAFKA-8391
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Sagar Rao
>Priority: Critical
>  Labels: flaky-test
> Attachments: 100-gradle-builds.tar
>
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/4747/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testDeleteConnector/]
> {quote}java.lang.AssertionError: Condition not met within timeout 3. 
> Connector tasks did not stop in time. at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:375) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:352) at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector(RebalanceSourceConnectorsIntegrationTest.java:166){quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-8391) Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector

2023-07-23 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-8391:


Assignee: Sagar Rao

> Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector
> ---
>
> Key: KAFKA-8391
> URL: https://issues.apache.org/jira/browse/KAFKA-8391
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Sagar Rao
>Priority: Critical
>  Labels: flaky-test
> Attachments: 100-gradle-builds.tar
>
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/4747/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testDeleteConnector/]
> {quote}java.lang.AssertionError: Condition not met within timeout 3. 
> Connector tasks did not stop in time. at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:375) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:352) at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector(RebalanceSourceConnectorsIntegrationTest.java:166){quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15229) Increase default value of task.shutdown.graceful.timeout.ms

2023-07-21 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-15229:
--
Description: 
The Kafka Connect config [task.shutdown.graceful.timeout.ms. 
|https://kafka.apache.org/documentation/#connectconfigs_task.shutdown.graceful.timeout.ms]has
 a default value of 5s. As per it's definition:

 
{noformat}
Amount of time to wait for tasks to shutdown gracefully. This is the total 
amount of time, not per task. All task have shutdown triggered, then they are 
waited on sequentially.{noformat}

it is the total timeout for all tasks to shutdown. Also, if multiple tasks are 
to be shutdown then, they are waited upon sequentially. Now the default value 
of this config is ok for smaller clusters with less number of tasks, on a 
larger cluster because the timeout can elapse we will see a lot of messages of 
the form 



{noformat}
Graceful stop of task  failed.
{noformat}



In case of failure in graceful stop of tasks, the tasks are cancelled which 
means that they won't send out a status update. Once that happens there won't 
be any `UNASSIGNED` status message posted for that task. Let's say the task 
stop was triggered by a worker going down. If the cluster is configured to use 
Incremental Cooperative Assignor, then the task wouldn't be reassigned until 
scheduled.rebalance.delay.max.ms interval elapses. So, for that amount of 
duration, the task would show up with status RUNNING whenever it's status is 
queried for. This can be confusing for the users.

This problem can be exacerbated on cloud environments(like kubernetes pods) 
because there is a high chance that the running status would be associated with 
an older worker_id which doesn't even exist in the cluster anymore. 

While the net effect of all of this is not catastrophic i.e it won't lead to 
any processing delays  or loss of data but the status of the task would be off. 
And if there are fast rebalances happening under Incremental Cooperative 
Assignor, then that duration could be high as well. 

So, the proposal is to increase the default value to a higher value. I am 
thinking we can set it to 60s because as far as I can see, it doesn't interfere 
with any other timeout that we have. 

Also while users can set this config on their clusters, this is a low 
importance config and generally goes unnoticed. So I believe we should look to 
increase the default value. 
I am tagging this as need-kip because I believe we will need one.




  was:
The Kafka Connect config [task.shutdown.graceful.timeout.ms. 
|https://kafka.apache.org/documentation/#connectconfigs_task.shutdown.graceful.timeout.ms]has
 a default value of 5s. As per it's definition:

 
{noformat}
Amount of time to wait for tasks to shutdown gracefully. This is the total 
amount of time, not per task. All task have shutdown triggered, then they are 
waited on sequentially.{noformat}

it is the total timeout for all tasks to shutdown. Also, if multiple tasks are 
to be shutdown then, they are waited upon sequentially. Now the default value 
of this config is ok for smaller clusters with less number of tasks, on a 
larger cluster because the timeout can elapse we will see a lot of messages of 
the form 



{noformat}
Graceful stop of task  failed.
{noformat}



In case of failure in graceful stop of tasks, the tasks are cancelled which 
means that they won't send out a status update. Once that happens there won't 
be any `UNASSIGNED` status message posted for that task. Let's say the task 
stop was triggered by a worker going down. If the cluster is configured to use 
Incremental Cooperative Assignor, then the task wouldn't be reassigned until 
scheduled.rebalance.delay.max.ms interval elapses. So, for that amount of 
duration, the task would show up with status RUNNING whenever it's status is 
queried for. This can be confusing for the users.

This problem can be exacerbated on cloud environments(like kubernetes pods) 
because there is a high chance that the running status would be associated with 
an older worker_id which doesn't even exist in the cluster anymore. 

While the net effect of all of this is not catastrophic i.e it won't lead to 
any processing delays  or loss of data but the status of the task would be off. 
And if there are fast rebalances happening under Incremental Cooperative 
Assignor, then that duration could be high as well. 

So, the proposal is to increase the default value to a higher value. I am 
thinking we can set it to 60s because as far as I can see, it doesn't interfere 
with any other timeout that we have. 

I am tagging this as need-kip because I believe we will need one.





> Increase default value of task.shutdown.graceful.timeout.ms
> ---
>
> Key: KAFKA-15229
> URL: https://issues.apache.org/jira/browse/KAFKA-15229
> Project: Kafka
> 

[jira] [Updated] (KAFKA-15229) Increase default value of task.shutdown.graceful.timeout.ms

2023-07-21 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-15229:
--
Description: 
The Kafka Connect config [task.shutdown.graceful.timeout.ms. 
|https://kafka.apache.org/documentation/#connectconfigs_task.shutdown.graceful.timeout.ms]has
 a default value of 5s. As per it's definition:

 
{noformat}
Amount of time to wait for tasks to shutdown gracefully. This is the total 
amount of time, not per task. All task have shutdown triggered, then they are 
waited on sequentially.{noformat}

it is the total timeout for all tasks to shutdown. Also, if multiple tasks are 
to be shutdown then, they are waited upon sequentially. Now the default value 
of this config is ok for smaller clusters with less number of tasks, on a 
larger cluster because the timeout can elapse we will see a lot of messages of 
the form 



{noformat}
Graceful stop of task  failed.
{noformat}



In case of failure in graceful stop of tasks, the tasks are cancelled which 
means that they won't send out a status update. Once that happens there won't 
be any `UNASSIGNED` status message posted for that task. Let's say the task 
stop was triggered by a worker going down. If the cluster is configured to use 
Incremental Cooperative Assignor, then the task wouldn't be reassigned until 
scheduled.rebalance.delay.max.ms interval elapses. So, for that amount of 
duration, the task would show up with status RUNNING whenever it's status is 
queried for. This can be confusing for the users.

This problem can be exacerbated on cloud environments(like kubernetes pods) 
because there is a high chance that the running status would be associated with 
an older worker_id which doesn't even exist in the cluster anymore. 

While the net effect of all of this is not catastrophic i.e it won't lead to 
any processing delays  or loss of data but the status of the task would be off. 
And if there are fast rebalances happening under Incremental Cooperative 
Assignor, then that duration could be high as well. 

So, the proposal is to increase the default value to a higher value. I am 
thinking we can set it to 60s because as far as I can see, it doesn't interfere 
with any other timeout that we have. 

I am tagging this as need-kip because I believe we will need one.




  was:
The Kafka Connect config [task.shutdown.graceful.timeout.ms. 
|https://kafka.apache.org/documentation/#connectconfigs_task.shutdown.graceful.timeout.ms]has
 a default value of 5s. As per it's definition:

 
{noformat}
Amount of time to wait for tasks to shutdown gracefully. This is the total 
amount of time, not per task. All task have shutdown triggered, then they are 
waited on sequentially.{noformat}

it is the total timeout for all tasks to shutdown. Also, if multiple tasks are 
to be shutdown then, they are waited upon sequentially. Now the default value 
of this config is ok for smaller clusters with less number of tasks, on a 
larger cluster because the timeout can elapse we will see a lot of messages of 
the form 

```
Graceful stop of task  failed.
```

In case of failure in graceful stop of tasks, the tasks are cancelled which 
means that they won't send out a status update. Once that happens there won't 
be any `UNASSIGNED` status message posted for that task. Let's say the task 
stop was triggered by a worker going down. If the cluster is configured to use 
Incremental Cooperative Assignor, then the task wouldn't be reassigned until 
scheduled.rebalance.delay.max.ms interval elapses. So, for that amount of 
duration, the task would show up with status RUNNING whenever it's status is 
queried for. This can be confusing for the users.

This problem can be exacerbated on cloud environments(like kubernetes pods) 
because there is a high chance that the running status would be associated with 
an older worker_id which doesn't even exist in the cluster anymore. 

While the net effect of all of this is not catastrophic i.e it won't lead to 
any processing delays  or loss of data but the status of the task would be off. 
And if there are fast rebalances happening under Incremental Cooperative 
Assignor, then that duration could be high as well. 

So, the proposal is to increase the default value to a higher value. I am 
thinking we can set it to 60s because as far as I can see, it doesn't interfere 
with any other timeout that we have. 

I am tagging this as need-kip because I believe we will need one.





> Increase default value of task.shutdown.graceful.timeout.ms
> ---
>
> Key: KAFKA-15229
> URL: https://issues.apache.org/jira/browse/KAFKA-15229
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>
> The Kafka Connect config 

[jira] [Created] (KAFKA-15229) Increase default value of task.shutdown.graceful.timeout.ms

2023-07-21 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-15229:
-

 Summary: Increase default value of 
task.shutdown.graceful.timeout.ms
 Key: KAFKA-15229
 URL: https://issues.apache.org/jira/browse/KAFKA-15229
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Sagar Rao
Assignee: Sagar Rao


The Kafka Connect config [task.shutdown.graceful.timeout.ms. 
|https://kafka.apache.org/documentation/#connectconfigs_task.shutdown.graceful.timeout.ms]has
 a default value of 5s. As per it's definition:

 
{noformat}
Amount of time to wait for tasks to shutdown gracefully. This is the total 
amount of time, not per task. All task have shutdown triggered, then they are 
waited on sequentially.{noformat}

it is the total timeout for all tasks to shutdown. Also, if multiple tasks are 
to be shutdown then, they are waited upon sequentially. Now the default value 
of this config is ok for smaller clusters with less number of tasks, on a 
larger cluster because the timeout can elapse we will see a lot of messages of 
the form 

```
Graceful stop of task  failed.
```

In case of failure in graceful stop of tasks, the tasks are cancelled which 
means that they won't send out a status update. Once that happens there won't 
be any `UNASSIGNED` status message posted for that task. Let's say the task 
stop was triggered by a worker going down. If the cluster is configured to use 
Incremental Cooperative Assignor, then the task wouldn't be reassigned until 
scheduled.rebalance.delay.max.ms interval elapses. So, for that amount of 
duration, the task would show up with status RUNNING whenever it's status is 
queried for. This can be confusing for the users.

This problem can be exacerbated on cloud environments(like kubernetes pods) 
because there is a high chance that the running status would be associated with 
an older worker_id which doesn't even exist in the cluster anymore. 

While the net effect of all of this is not catastrophic i.e it won't lead to 
any processing delays  or loss of data but the status of the task would be off. 
And if there are fast rebalances happening under Incremental Cooperative 
Assignor, then that duration could be high as well. 

So, the proposal is to increase the default value to a higher value. I am 
thinking we can set it to 60s because as far as I can see, it doesn't interfere 
with any other timeout that we have. 

I am tagging this as need-kip because I believe we will need one.






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15218) NPE will be thrown while deleting topic and fetch from follower concurrently

2023-07-20 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-15218:
-

Assignee: Sagar Rao

> NPE will be thrown while deleting topic and fetch from follower concurrently
> 
>
> Key: KAFKA-15218
> URL: https://issues.apache.org/jira/browse/KAFKA-15218
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.5.0
>Reporter: Luke Chen
>Assignee: Sagar Rao
>Priority: Major
>
> When deleting topics, we'll first clear all the remoteReplicaMap when 
> stopPartitions 
> [here|https://github.com/apache/kafka/blob/2999168cde37142ae3a2377fe939d6b581e692b8/core/src/main/scala/kafka/server/ReplicaManager.scala#L554].
>  But this time, there might be fetch request coming from follower, and try to 
> check if the replica is eligible to be added into ISR 
> [here|https://github.com/apache/kafka/blob/2999168cde37142ae3a2377fe939d6b581e692b8/core/src/main/scala/kafka/cluster/Partition.scala#L1001].
>  At this moment, NPE will be thrown. Although it's fine since this topic is 
> already deleted, it'd be better to avoid it happen.
>  
>  
> {code:java}
> java.lang.NullPointerException: Cannot invoke 
> "kafka.cluster.Replica.stateSnapshot()" because the return value of 
> "kafka.utils.Pool.get(Object)" is null  at 
> kafka.cluster.Partition.isReplicaIsrEligible(Partition.scala:992) 
> ~[kafka_2.13-3.5.0.jar:?]  at 
> kafka.cluster.Partition.canAddReplicaToIsr(Partition.scala:974) 
> ~[kafka_2.13-3.5.0.jar:?]at 
> kafka.cluster.Partition.maybeExpandIsr(Partition.scala:947) 
> ~[kafka_2.13-3.5.0.jar:?]at 
> kafka.cluster.Partition.updateFollowerFetchState(Partition.scala:866) 
> ~[kafka_2.13-3.5.0.jar:?]  at 
> kafka.cluster.Partition.fetchRecords(Partition.scala:1361) 
> ~[kafka_2.13-3.5.0.jar:?] at 
> kafka.server.ReplicaManager.read$1(ReplicaManager.scala:1164) 
> ~[kafka_2.13-3.5.0.jar:?]  at 
> kafka.server.ReplicaManager.$anonfun$readFromLocalLog$7(ReplicaManager.scala:1235)
>  ~[kafka_2.13-3.5.0.jar:?] at 
> scala.collection.IterableOnceOps.foreach(IterableOnce.scala:575) 
> ~[scala-library-2.13.10.jar:?]  at 
> scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:573) 
> ~[scala-library-2.13.10.jar:?] at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:933) 
> ~[scala-library-2.13.10.jar:?] at 
> kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:1234) 
> ~[kafka_2.13-3.5.0.jar:?]at 
> kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:1044) 
> ~[kafka_2.13-3.5.0.jar:?]   at 
> kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:994) 
> ~[kafka_2.13-3.5.0.jar:?] at 
> kafka.server.KafkaApis.handle(KafkaApis.scala:181) ~[kafka_2.13-3.5.0.jar:?] 
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76) 
> ~[kafka_2.13-3.5.0.jar:?] at java.lang.Thread.run(Thread.java:1623) [?:?] 
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-12525) Inaccurate task status due to status record interleaving in fast rebalances in Connect

2023-07-20 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-12525.
---
Resolution: Fixed

> Inaccurate task status due to status record interleaving in fast rebalances 
> in Connect
> --
>
> Key: KAFKA-12525
> URL: https://issues.apache.org/jira/browse/KAFKA-12525
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.1, 2.4.1, 2.5.1, 2.7.0, 2.6.1
>Reporter: Konstantine Karantasis
>Assignee: Sagar Rao
>Priority: Major
>
> When a task is stopped in Connect it produces an {{UNASSIGNED}} status 
> record. 
> Equivalently, when a task is started or restarted in Connect it produces an 
> {{RUNNING}} status record in the Connect status topic.
> At the same time rebalances are decoupled from task start and stop. These 
> operations happen in separate executor outside of the main worker thread that 
> performs the rebalance.
> Normally, any delayed and stale {{UNASSIGNED}} status records are fenced by 
> the worker that is sending them. This worker is using the 
> {{StatusBackingStore#putSafe}} method that will reject any stale status 
> messages (called only for {{UNASSIGNED}} or {{FAILED}}) as long as the worker 
> is aware of the newer status record that declares a task as {{RUNNING}}.
> In cases of fast consecutive rebalances where a task is revoked from one 
> worker and assigned to another one, it has been observed that there is a 
> small time window and thus a race condition during which a {{RUNNING}} status 
> record in the new generation is produced and is immediately followed by a 
> delayed {{UNASSIGNED}} status record belonging to the same or a previous 
> generation before the worker that sends this message reads the {{RUNNING}} 
> status record that corresponds to the latest generation.
> A couple of options are available to remediate this race condition. 
> For example a worker that is has started a task can re-write the {{RUNNING}} 
> status message in the topic if it reads a stale {{UNASSIGNED}} message from a 
> previous generation (that should have been fenced). 
> Another option is to ignore stale {{UNASSIGNED}} message (messages from an 
> earlier generation than the one in which the task had {{RUNNING}} status).
> Worth noting that when this race condition takes place, besides the 
> inaccurate status representation, the actual execution of the tasks remains 
> unaffected (e.g. the tasks are running correctly even though they appear as 
> {{UNASSIGNED}}). 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15127) Allow offsets to be reset at the same time a connector is deleted.

2023-07-01 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-15127:
--
Description: 
This has been listed as [Future 
Work|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Automaticallydeleteoffsetswithconnectors]
 in KIP-875. Now that the delete offsets mechanism is also in place, we can 
take this up which will allow connector names to be reused after connector 
deletion. 

 

Note that we can get started with this once the Offsets management API has been 
adopted by a few users and we have got feedback around it's functioning.

  was:This has been listed as [Future 
Work|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Automaticallydeleteoffsetswithconnectors]
 in KIP-875. Now that the delete offsets mechanism is also in place, we can 
take this up which will allow connector names to be reused after connector 
deletion. 


> Allow offsets to be reset at the same time a connector is deleted.
> --
>
> Key: KAFKA-15127
> URL: https://issues.apache.org/jira/browse/KAFKA-15127
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Priority: Major
>  Labels: needs-kip
>
> This has been listed as [Future 
> Work|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Automaticallydeleteoffsetswithconnectors]
>  in KIP-875. Now that the delete offsets mechanism is also in place, we can 
> take this up which will allow connector names to be reused after connector 
> deletion. 
>  
> Note that we can get started with this once the Offsets management API has 
> been adopted by a few users and we have got feedback around it's functioning.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15127) Allow offsets to be reset at the same time a connector is deleted.

2023-07-01 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-15127:
-

Assignee: (was: Sagar Rao)

> Allow offsets to be reset at the same time a connector is deleted.
> --
>
> Key: KAFKA-15127
> URL: https://issues.apache.org/jira/browse/KAFKA-15127
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Priority: Major
>  Labels: needs-kip
>
> This has been listed as [Future 
> Work|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Automaticallydeleteoffsetswithconnectors]
>  in KIP-875. Now that the delete offsets mechanism is also in place, we can 
> take this up which will allow connector names to be reused after connector 
> deletion. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15127) Allow offsets to be reset at the same time a connector is deleted.

2023-06-29 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738704#comment-17738704
 ] 

Sagar Rao commented on KAFKA-15127:
---

I don't intend working on this immediately or in the near future. It's just 
created for future ref.

> Allow offsets to be reset at the same time a connector is deleted.
> --
>
> Key: KAFKA-15127
> URL: https://issues.apache.org/jira/browse/KAFKA-15127
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip
>
> This has been listed as [Future 
> Work|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Automaticallydeleteoffsetswithconnectors]
>  in KIP-875. Now that the delete offsets mechanism is also in place, we can 
> take this up which will allow connector names to be reused after connector 
> deletion. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15127) Allow offsets to be reset at the same time a connector is deleted.

2023-06-27 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-15127:
-

Assignee: Sagar Rao

> Allow offsets to be reset at the same time a connector is deleted.
> --
>
> Key: KAFKA-15127
> URL: https://issues.apache.org/jira/browse/KAFKA-15127
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip
>
> This has been listed as [Future 
> Work|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Automaticallydeleteoffsetswithconnectors]
>  in KIP-875. Now that the delete offsets mechanism is also in place, we can 
> take this up which will allow connector names to be reused after connector 
> deletion. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15127) Allow offsets to be reset at the same time a connector is deleted.

2023-06-27 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-15127:
-

 Summary: Allow offsets to be reset at the same time a connector is 
deleted.
 Key: KAFKA-15127
 URL: https://issues.apache.org/jira/browse/KAFKA-15127
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Sagar Rao


This has been listed as [Future 
Work|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Automaticallydeleteoffsetswithconnectors]
 in KIP-875. Now that the delete offsets mechanism is also in place, we can 
take this up which will allow connector names to be reused after connector 
deletion. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14913) Migrate DistributedHerder Executor shutdown to use ThreadUtils#shutdownExecutorServiceQuietly

2023-06-18 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-14913.
---
Resolution: Fixed

> Migrate DistributedHerder Executor shutdown to use 
> ThreadUtils#shutdownExecutorServiceQuietly
> -
>
> Key: KAFKA-14913
> URL: https://issues.apache.org/jira/browse/KAFKA-14913
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Minor
>
> Some context here: 
> https://github.com/apache/kafka/pull/13557#issuecomment-1509738740



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14913) Migrate DistributedHerder Executor shutdown to use ThreadUtils#shutdownExecutorServiceQuietly

2023-06-18 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17733859#comment-17733859
 ] 

Sagar Rao commented on KAFKA-14913:
---

[~elkkhan] , the PR for this fix is already merged. 
[https://github.com/apache/kafka/pull/13594] For some reason, the ticket didn't 
get updated.

> Migrate DistributedHerder Executor shutdown to use 
> ThreadUtils#shutdownExecutorServiceQuietly
> -
>
> Key: KAFKA-14913
> URL: https://issues.apache.org/jira/browse/KAFKA-14913
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Minor
>
> Some context here: 
> https://github.com/apache/kafka/pull/13557#issuecomment-1509738740



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-14131) KafkaBasedLog#readToLogEnd() may accciedently falls into infinite loop

2023-06-07 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-14131:
-

Assignee: Sambhav Jain  (was: Sagar Rao)

> KafkaBasedLog#readToLogEnd() may accciedently falls into infinite loop
> --
>
> Key: KAFKA-14131
> URL: https://issues.apache.org/jira/browse/KAFKA-14131
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Reporter: Justinwins
>Assignee: Sambhav Jain
>Priority: Major
>
> When a herder starts ,its KafkaOffsetBackingStore will readToLogEnd() by
> DistributedHerder.herderExecutor of name "Distrubuted-connect-"  thread , e.g 
> . Distrubuted-connect-28-1 , which may consume  a few minutes.
> If another thread tries to shut down this herder , it will block for 
> "task.shutdown.graceful.timeout.ms ' before  the 
> DistributedHerder.herderExecutor is interrupted. 
> And if thread in DistributedHerder.herderExecutor is interupted, 
> KafkaOffsetBackingStore.readToLogEnd() will  poll(Integer.MAX_VALUE) and log "
> Error polling" as  the the read  has been interrupted, then 
> "consumer.position" will not advance, readToLogEnd() falls into infinite loop.
>  
> {code:java}
> // code placeholder
> private void readToLogEnd() {
> Set assignment = consumer.assignment();
> Map endOffsets = readEndOffsets(assignment);
> log.trace("Reading to end of log offsets {}", endOffsets);
> while (!endOffsets.isEmpty()) { // this loop will never jump out
> Iterator> it = 
> endOffsets.entrySet().iterator();
> while (it.hasNext()) {
> Map.Entry entry = it.next();
> TopicPartition topicPartition = entry.getKey();
> long endOffset = entry.getValue();
> long lastConsumedOffset = consumer.position(topicPartition);  // 
> when thread was in interupted status ,consumer.position will not advance
> if (lastConsumedOffset >= endOffset) {
> log.trace("Read to end offset {} for {}", endOffset, 
> topicPartition);
> it.remove();
> } else {
> log.trace("Behind end offset {} for {}; last-read offset is 
> {}",
> endOffset, topicPartition, lastConsumedOffset);
> poll(Integer.MAX_VALUE); // here , poll() will catch 
> InterruptedException and log it without throwing it up
> break;
> }
> }
> }
> } {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-15041) Source Connector auto topic creation fails when topic is deleted and brokers don't support auto topic creation

2023-05-31 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-15041:
--
Description: 
[KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics]
  allows the source connectors to create topics even when the broker doesn't 
allow to do so. It does so by checking for every record if a topic needs to be 
created 
[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L500.]
 To not always keep checking for topic presence via admin topics, it also 
maintains a cache of the topics that it has created and doesn't create those 
anymore. This helps to create topics when brokers don't support automatic topic 
creation.

However, lets say the topic gets created initially and later on gets deleted 
while the connector is still running and the brokers don't support automatic 
topic creation. For such cases, the connector has cached the topic it has 
already created and wouldn't recreate it because the cache never updates and 
since the broker doesn't support topic creation, the logs would just be full of 
messages like 

 
{code:java}
Error while fetching metadata with correlation id 3260 : 
{connect-test=UNKNOWN_TOPIC_OR_PARTITION}{code}
 

This can become a problem on environments where brokers don't allow topic 
creation. We need a way to refresh the topics cache for such cases.

  was:
[KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics]
  allows the source connectors to create topics even when the broker doesn't 
allow to do so. It does so by checking for every record if a topic needs to be 
created 
[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L500.]
 To not always keep checking for topic presence via admin topics, it also 
maintains a cache of the topics that it has created and doesn't create those 
anymore. This helps to create topics when brokers don't support automatic topic 
creation.

However, lets say the topic gets created initially and later on gets deleted 
while the connector is still running and the brokers don't support automatic 
topic creation. For such cases, the connector has cached the topic it has 
already created and wouldn't recreate it because the cache never updates and 
since the broker doesn't support topic creation, the logs would just be full of 
messages like 

 
{code:java}
Error while fetching metadata with correlation id 3260 : 
{connect-test=UNKNOWN_TOPIC_OR_PARTITION}{code}
 

This can become a problem on enviroments where brokers don't allow topic 
creation. We need a way to refresh the topics cache for such cases.


> Source Connector auto topic creation fails when topic is deleted and brokers 
> don't support auto topic creation
> --
>
> Key: KAFKA-15041
> URL: https://issues.apache.org/jira/browse/KAFKA-15041
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>
> [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics]
>   allows the source connectors to create topics even when the broker doesn't 
> allow to do so. It does so by checking for every record if a topic needs to 
> be created 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L500.]
>  To not always keep checking for topic presence via admin topics, it also 
> maintains a cache of the topics that it has created and doesn't create those 
> anymore. This helps to create topics when brokers don't support automatic 
> topic creation.
> However, lets say the topic gets created initially and later on gets deleted 
> while the connector is still running and the brokers don't support automatic 
> topic creation. For such cases, the connector has cached the topic it has 
> already created and wouldn't recreate it because the cache never updates and 
> since the broker doesn't support topic creation, the logs would just be full 
> of messages like 
>  
> {code:java}
> Error while fetching metadata with correlation id 3260 : 
> {connect-test=UNKNOWN_TOPIC_OR_PARTITION}{code}
>  
> This can become a problem on environments where brokers don't allow topic 
> creation. We need a way to refresh the topics cache for such cases.



--
This message was sent by Atlassian 

[jira] [Updated] (KAFKA-15041) Source Connector auto topic creation fails when topic is deleted and brokers don't support auto topic creation

2023-05-31 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-15041:
--
Description: 
[KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics]
  allows the source connectors to create topics even when the broker doesn't 
allow to do so. It does so by checking for every record if a topic needs to be 
created 
[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L500.]
 To not always keep checking for topic presence via admin topics, it also 
maintains a cache of the topics that it has created and doesn't create those 
anymore. This helps to create topics when brokers don't support automatic topic 
creation.

However, lets say the topic gets created initially and later on gets deleted 
while the connector is still running and the brokers don't support automatic 
topic creation. For such cases, the connector has cached the topic it has 
already created and wouldn't recreate it because the cache never updates and 
since the broker doesn't support topic creation, the logs would just be full of 
messages like 

 
{code:java}
Error while fetching metadata with correlation id 3260 : 
{connect-test=UNKNOWN_TOPIC_OR_PARTITION}{code}
 

This can become a problem on enviroments where brokers don't allow topic 
creation. We need a way to refresh the topics cache for such cases.

  was:
[KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics]
  allows the source connectors to create topics even when the broker doesn't 
allow to do so. It does so by checking for every record if a topic needs to be 
created 
[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L500.]
 To not always keep checking for topic presence via admin topics, it also 
maintains a cache of the topics that it has created and doesn't create those 
anymore. This helps to create topics when brokers don't support automatic topic 
creation.

However, lets say the topic gets created initially and later on gets deleted 
while the connector is still running and the brokers don't support automatic 
topic creation. For such cases, the connector has cached the topic it has 
already created and wouldn't recreate it because the cache never updates and 
since the broker doesn't support topic creation, the logs would just be full of 
messages like 

```

 Error while fetching metadata with correlation id 3260 : 
\{connect-test=UNKNOWN_TOPIC_OR_PARTITION} 

```

 

This can become a problem on enviroments where brokers don't allow topic 
creation. We need a way to refresh the topics cache for such cases.


> Source Connector auto topic creation fails when topic is deleted and brokers 
> don't support auto topic creation
> --
>
> Key: KAFKA-15041
> URL: https://issues.apache.org/jira/browse/KAFKA-15041
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>
> [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics]
>   allows the source connectors to create topics even when the broker doesn't 
> allow to do so. It does so by checking for every record if a topic needs to 
> be created 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L500.]
>  To not always keep checking for topic presence via admin topics, it also 
> maintains a cache of the topics that it has created and doesn't create those 
> anymore. This helps to create topics when brokers don't support automatic 
> topic creation.
> However, lets say the topic gets created initially and later on gets deleted 
> while the connector is still running and the brokers don't support automatic 
> topic creation. For such cases, the connector has cached the topic it has 
> already created and wouldn't recreate it because the cache never updates and 
> since the broker doesn't support topic creation, the logs would just be full 
> of messages like 
>  
> {code:java}
> Error while fetching metadata with correlation id 3260 : 
> {connect-test=UNKNOWN_TOPIC_OR_PARTITION}{code}
>  
> This can become a problem on enviroments where brokers don't allow topic 
> creation. We need a way to refresh the topics cache for such cases.



--
This message was sent by Atlassian Jira

[jira] [Commented] (KAFKA-15041) Source Connector auto topic creation fails when topic is deleted and brokers don't support auto topic creation

2023-05-31 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727894#comment-17727894
 ] 

Sagar Rao commented on KAFKA-15041:
---

Ideally a worker restart should fix this but it didn't quite happen on my local 
testing. 

> Source Connector auto topic creation fails when topic is deleted and brokers 
> don't support auto topic creation
> --
>
> Key: KAFKA-15041
> URL: https://issues.apache.org/jira/browse/KAFKA-15041
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>
> [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics]
>   allows the source connectors to create topics even when the broker doesn't 
> allow to do so. It does so by checking for every record if a topic needs to 
> be created 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L500.]
>  To not always keep checking for topic presence via admin topics, it also 
> maintains a cache of the topics that it has created and doesn't create those 
> anymore. This helps to create topics when brokers don't support automatic 
> topic creation.
> However, lets say the topic gets created initially and later on gets deleted 
> while the connector is still running and the brokers don't support automatic 
> topic creation. For such cases, the connector has cached the topic it has 
> already created and wouldn't recreate it because the cache never updates and 
> since the broker doesn't support topic creation, the logs would just be full 
> of messages like 
> ```
>  Error while fetching metadata with correlation id 3260 : 
> \{connect-test=UNKNOWN_TOPIC_OR_PARTITION} 
> ```
>  
> This can become a problem on enviroments where brokers don't allow topic 
> creation. We need a way to refresh the topics cache for such cases.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15041) Source Connector auto topic creation fails when topic is deleted and brokers don't support auto topic creation

2023-05-31 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-15041:
-

 Summary: Source Connector auto topic creation fails when topic is 
deleted and brokers don't support auto topic creation
 Key: KAFKA-15041
 URL: https://issues.apache.org/jira/browse/KAFKA-15041
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Sagar Rao
Assignee: Sagar Rao


[KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics]
  allows the source connectors to create topics even when the broker doesn't 
allow to do so. It does so by checking for every record if a topic needs to be 
created 
[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L500.]
 To not always keep checking for topic presence via admin topics, it also 
maintains a cache of the topics that it has created and doesn't create those 
anymore. This helps to create topics when brokers don't support automatic topic 
creation.

However, lets say the topic gets created initially and later on gets deleted 
while the connector is still running and the brokers don't support automatic 
topic creation. For such cases, the connector has cached the topic it has 
already created and wouldn't recreate it because the cache never updates and 
since the broker doesn't support topic creation, the logs would just be full of 
messages like 

```

 Error while fetching metadata with correlation id 3260 : 
\{connect-test=UNKNOWN_TOPIC_OR_PARTITION} 

```

 

This can become a problem on enviroments where brokers don't allow topic 
creation. We need a way to refresh the topics cache for such cases.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15018) Potential tombstone offsets corruption for exactly-once source connectors

2023-05-30 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727444#comment-17727444
 ] 

Sagar Rao commented on KAFKA-15018:
---

Thanks [~yash.mayya] . I have assigned this to myself. I went through your 2 
approaches and feel that option 1 (the option also mentioned by Chris) might be 
the cleaner one to handle this case. While the second option is viable, it 
would need some more changes to have it incorporated correctly- as you pointed 
out.

> Potential tombstone offsets corruption for exactly-once source connectors
> -
>
> Key: KAFKA-15018
> URL: https://issues.apache.org/jira/browse/KAFKA-15018
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.5.0, 3.4.1
>Reporter: Chris Egerton
>Assignee: Sagar Rao
>Priority: Major
>
> When exactly-once support is enabled for source connectors, source offsets 
> can potentially be written to two different offsets topics: a topic specific 
> to the connector, and the global offsets topic (which was used for all 
> connectors prior to KIP-618 / version 3.3.0).
> Precedence is given to offsets in the per-connector offsets topic, but if 
> none are found for a given partition, then the global offsets topic is used 
> as a fallback.
> When committing offsets, a transaction is used to ensure that source records 
> and source offsets are written to the Kafka cluster targeted by the source 
> connector. This transaction only includes the connector-specific offsets 
> topic. Writes to the global offsets topic take place after writes to the 
> connector-specific offsets topic have completed successfully, and if they 
> fail, a warning message is logged, but no other action is taken.
> Normally, this ensures that, for offsets committed by exactly-once-supported 
> source connectors, the per-connector offsets topic is at least as up-to-date 
> as the global offsets topic, and sometimes even ahead.
> However, for tombstone offsets, we lose that guarantee. If a tombstone offset 
> is successfully written to the per-connector offsets topic, but cannot be 
> written to the global offsets topic, then the global offsets topic will still 
> contain that source offset, but the per-connector topic will not. Due to the 
> fallback-on-global logic used by the worker, if a task requests offsets for 
> one of the tombstoned partitions, the worker will provide it with the offsets 
> present in the global offsets topic, instead of indicating to the task that 
> no offsets can be found.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15018) Potential tombstone offsets corruption for exactly-once source connectors

2023-05-30 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-15018:
-

Assignee: Sagar Rao

> Potential tombstone offsets corruption for exactly-once source connectors
> -
>
> Key: KAFKA-15018
> URL: https://issues.apache.org/jira/browse/KAFKA-15018
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.5.0, 3.4.1
>Reporter: Chris Egerton
>Assignee: Sagar Rao
>Priority: Major
>
> When exactly-once support is enabled for source connectors, source offsets 
> can potentially be written to two different offsets topics: a topic specific 
> to the connector, and the global offsets topic (which was used for all 
> connectors prior to KIP-618 / version 3.3.0).
> Precedence is given to offsets in the per-connector offsets topic, but if 
> none are found for a given partition, then the global offsets topic is used 
> as a fallback.
> When committing offsets, a transaction is used to ensure that source records 
> and source offsets are written to the Kafka cluster targeted by the source 
> connector. This transaction only includes the connector-specific offsets 
> topic. Writes to the global offsets topic take place after writes to the 
> connector-specific offsets topic have completed successfully, and if they 
> fail, a warning message is logged, but no other action is taken.
> Normally, this ensures that, for offsets committed by exactly-once-supported 
> source connectors, the per-connector offsets topic is at least as up-to-date 
> as the global offsets topic, and sometimes even ahead.
> However, for tombstone offsets, we lose that guarantee. If a tombstone offset 
> is successfully written to the per-connector offsets topic, but cannot be 
> written to the global offsets topic, then the global offsets topic will still 
> contain that source offset, but the per-connector topic will not. Due to the 
> fallback-on-global logic used by the worker, if a task requests offsets for 
> one of the tombstoned partitions, the worker will provide it with the offsets 
> present in the global offsets topic, instead of indicating to the task that 
> no offsets can be found.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15005) Status of KafkaConnect task not correct

2023-05-21 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17724615#comment-17724615
 ] 

Sagar Rao commented on KAFKA-15005:
---

Thank you [~LucentWong] , I have updated the condition in the PR to include 
stale UNASSIGNED status from both previous and current generation. 

> Status of KafkaConnect task not correct
> ---
>
> Key: KAFKA-15005
> URL: https://issues.apache.org/jira/browse/KAFKA-15005
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1, 3.0.0, 3.3.2
>Reporter: Yu Wang
>Priority: Major
>
> Our MM2 is running version 2.5.1.
> After a rebalance of our MM2 source tasks, we found there were several tasks 
> always in *UNASSIGNED* status, even the real tasks already started. 
> So we dump the payload of the status topic of Kafka Connect, and found the 
> last two status change is status *RUNNING* followed by status 
> {*}UNASSIGNED{*}.
> {code:java}
> LogAppendTime: 1684167885961 keysize: 64 valuesize: 95 sequence: -1 
> headerKeys: [] key: task-7 payload: 
> {"state":"RUNNING","trace":null,"worker_id":"worker-1","generation":437643}
> LogAppendTime: 1684167903456 keysize: 64 valuesize: 98 sequence: -1 
> headerKeys: [] key: task-7 payload: 
> {"state":"UNASSIGNED","trace":null,"worker_id":"worker-2","generation":437643}
>  {code}
> But usually, the RUNNING status should be appended after the UNASSIGNED, 
> because the worker coordinator will revoked the tasks before start new tasks.
> Then we checked the log of our MM2 worker. And found that, during that time, 
> there was a task that revoked on worker-2 and started on worker-1.
>  
> Worker-1
> {code:java}
> [2023-05-15 09:24:45,950] INFO [Worker clientId=connect-1, 
> groupId=__group] Starting task task-7 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> [2023-05-15 09:24:45,951] INFO Creating task task-7 
> (org.apache.kafka.connect.runtime.Worker) {code}
> Worker-2
> {code:java}
> [2023-05-15 09:24:40,922] INFO Stopping task task-7 
> (org.apache.kafka.connect.runtime.Worker) {code}
>  
> So I think the incorrect status was caused by the revoked task finished later 
> than the new started task, which made the UNASSIGNED status append to that 
> status topic after the RUNNING status. 
>  
> After reading the code of DistributeHerder, I found that the task revoking is 
> running in a thread pool, the revoke operation just return after submit all 
> the callables. So I think even in the same worker, there is not a guarantee 
> that the revoke operation will always finish before the new tasks start.
> {code:java}
> for (final ConnectorTaskId taskId : tasks) {
> callables.add(getTaskStoppingCallable(taskId));
> }
> // The actual timeout for graceful task/connector stop is applied in worker's
> // stopAndAwaitTask/stopAndAwaitConnector methods.
> startAndStop(callables);
> log.info("Finished stopping tasks in preparation for rebalance"); {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15005) Status of KafkaConnect task not correct

2023-05-19 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17724481#comment-17724481
 ] 

Sagar Rao commented on KAFKA-15005:
---

Thanks for the comment [~LucentWong] . Yeah in that PR, I had avoided the 
current generation. One thing I wanted to check with you (since you have 
rightfully masked the worker_ids from the status topic dump), the RUNNING 
message that you see corresponds to worker_1 i.e the worker where the task 
started and the UNASSIGNED status message belongs to worker_2? I am pretty sure 
that is the case but just being double sure. In that case, it should be very 
safe to add that check of generation equality as well.

> Status of KafkaConnect task not correct
> ---
>
> Key: KAFKA-15005
> URL: https://issues.apache.org/jira/browse/KAFKA-15005
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1, 3.0.0, 3.3.2
>Reporter: Yu Wang
>Priority: Major
>
> Our MM2 is running version 2.5.1.
> After a rebalance of our MM2 source tasks, we found there were several tasks 
> always in *UNASSIGNED* status, even the real tasks already started. 
> So we dump the payload of the status topic of Kafka Connect, and found the 
> last two status change is status *RUNNING* followed by status 
> {*}UNASSIGNED{*}.
> {code:java}
> LogAppendTime: 1684167885961 keysize: 64 valuesize: 95 sequence: -1 
> headerKeys: [] key: task-7 payload: 
> {"state":"RUNNING","trace":null,"worker_id":"x","generation":437643}
> LogAppendTime: 1684167903456 keysize: 64 valuesize: 98 sequence: -1 
> headerKeys: [] key: task-7 payload: 
> {"state":"UNASSIGNED","trace":null,"worker_id":"x","generation":437643}
>  {code}
> But usually, the RUNNING status should be appended after the UNASSIGNED, 
> because the worker coordinator will revoked the tasks before start new tasks.
> Then we checked the log of our MM2 worker. And found that, during that time, 
> there was a task that revoked on worker-2 and started on worker-1.
>  
> Worker-1
> {code:java}
> [2023-05-15 09:24:45,950] INFO [Worker clientId=connect-1, 
> groupId=__group] Starting task task-7 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> [2023-05-15 09:24:45,951] INFO Creating task task-7 
> (org.apache.kafka.connect.runtime.Worker) {code}
> Worker-2
> {code:java}
> [2023-05-15 09:24:40,922] INFO Stopping task task-7 
> (org.apache.kafka.connect.runtime.Worker) {code}
>  
> So I think the incorrect status was caused by the revoked task finished later 
> than the new started task, which made the UNASSIGNED status append to that 
> status topic after the RUNNING status. 
>  
> After reading the code of DistributeHerder, I found that the task revoking is 
> running in a thread pool, the revoke operation just return after submit all 
> the callables. So I think even in the same worker, there is not a guarantee 
> that the revoke operation will always finish before the new tasks start.
> {code:java}
> for (final ConnectorTaskId taskId : tasks) {
> callables.add(getTaskStoppingCallable(taskId));
> }
> // The actual timeout for graceful task/connector stop is applied in worker's
> // stopAndAwaitTask/stopAndAwaitConnector methods.
> startAndStop(callables);
> log.info("Finished stopping tasks in preparation for rebalance"); {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-15005) Status of KafkaConnect task not correct

2023-05-18 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17723885#comment-17723885
 ] 

Sagar Rao commented on KAFKA-15005:
---

hey [~LucentWong] thanks for reporting this. This looks very similar to 
https://issues.apache.org/jira/browse/KAFKA-12525 and has been present for a 
while. There's also a PR submitted for that bug. 

> Status of KafkaConnect task not correct
> ---
>
> Key: KAFKA-15005
> URL: https://issues.apache.org/jira/browse/KAFKA-15005
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1, 3.0.0, 3.3.2
>Reporter: Yu Wang
>Priority: Major
>
> Our MM2 is running version 2.5.1.
> After a rebalance of our MM2 source tasks, we found there were several tasks 
> always in *UNASSIGNED* status, even the real tasks already started. 
> So we dump the payload of the status topic of Kafka Connect, and found the 
> last two status change is status *RUNNING* followed by status 
> {*}UNASSIGNED{*}.
> {code:java}
> LogAppendTime: 1684167885961 keysize: 64 valuesize: 95 sequence: -1 
> headerKeys: [] key: task-7 payload: 
> {"state":"RUNNING","trace":null,"worker_id":"x","generation":437643}
> LogAppendTime: 1684167903456 keysize: 64 valuesize: 98 sequence: -1 
> headerKeys: [] key: task-7 payload: 
> {"state":"UNASSIGNED","trace":null,"worker_id":"x","generation":437643}
>  {code}
> But usually, the RUNNING status should be appended after the UNASSIGNED, 
> because the worker coordinator will revoked the tasks before start new tasks.
> Then we checked the log of our MM2 worker. And found that, during that time, 
> there was a task that revoked on worker-2 and started on worker-1.
>  
> Worker-1
> {code:java}
> [2023-05-15 09:24:45,950] INFO [Worker clientId=connect-1, 
> groupId=__group] Starting task task-7 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> [2023-05-15 09:24:45,951] INFO Creating task task-7 
> (org.apache.kafka.connect.runtime.Worker) {code}
> Worker-2
> {code:java}
> [2023-05-15 09:24:40,922] INFO Stopping task task-7 
> (org.apache.kafka.connect.runtime.Worker) {code}
>  
> So I think the incorrect status was caused by the revoked task finished later 
> than the new started task, which made the UNASSIGNED status append to that 
> status topic after the RUNNING status. 
>  
> After reading the code of DistributeHerder, I found that the task revoking is 
> running in a thread pool, the revoke operation just return after submit all 
> the callables. So I think even in the same worker, there is not a guarantee 
> that the revoke operation will always finish before the new tasks start.
> {code:java}
> for (final ConnectorTaskId taskId : tasks) {
> callables.add(getTaskStoppingCallable(taskId));
> }
> // The actual timeout for graceful task/connector stop is applied in worker's
> // stopAndAwaitTask/stopAndAwaitConnector methods.
> startAndStop(callables);
> log.info("Finished stopping tasks in preparation for rebalance"); {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-13109) WorkerSourceTask is not enforcing the errors.retry.timeout and errors.retry.delay.max.ms parameters in case of a RetriableException during task.poll()

2023-05-13 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-13109:
-

Assignee: Sagar Rao

> WorkerSourceTask is not enforcing the errors.retry.timeout and 
> errors.retry.delay.max.ms parameters in case of a RetriableException during 
> task.poll()
> --
>
> Key: KAFKA-13109
> URL: https://issues.apache.org/jira/browse/KAFKA-13109
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.8.0
>Reporter: Damien Gasparina
>Assignee: Sagar Rao
>Priority: Major
>
> It seems that the {{errors.retry.timeout}} timeout is not enforced if 
> {{RetriableException}} is thrown in the {{poll()}} of a SourceTask.
> Looking at Kafka Connect source code:
>  * If a task throws a {{RetriableException}} during a {{poll()}}, the connect 
> runtime catch it and returns null: 
> [https://github.com/apache/kafka/blob/2.8.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L273-L277]
>  * Then, {{toSend}} is set to null, and the runtime continues the loop and 
> re-execute the next iteration of poll without any delay 
> [https://github.com/apache/kafka/blob/2.8.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L240-L246]
>  
> This implies that, if the {{poll()}} is throwing a {{RetriableException}}:
>  * {{errors.retry.timeout}} is ignored and the task will retry indefinitely
>  * there would be no delay between each retry, {{errors.retry.delay.max.ms}} 
> is ignored, causing potential high resource utilization and log flooding
>  
> My understanding of 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect]
>  is that {{errors.retry.timeout}} and {{errors.retry.delay.max.ms}} should 
> have been respected in case of a {{RetriableException}} during a Source Task 
> {{poll()}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13156) KafkaStatusBackingStore making incorrect assumption about order of task and connector delete events

2023-05-13 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17722367#comment-17722367
 ] 

Sagar Rao commented on KAFKA-13156:
---

hey [~ChrisEgerton] , I assigned this to myself ( I hope that's ok). I see it 
has been open for a while. I can send out a PR for this.

> KafkaStatusBackingStore making incorrect assumption about order of task and 
> connector delete events
> ---
>
> Key: KAFKA-13156
> URL: https://issues.apache.org/jira/browse/KAFKA-13156
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Nigel Liang
>Assignee: Sagar Rao
>Priority: Minor
>
> When a connector remove message is read from the status topic, 
> `KafkaStatusBackingStore` removes both the connector and associated tasks 
> from its connector and tasks cache. However, due to potentially partitioned 
> nature of the status topic, there is no guarantee about the order of delivery 
> for the connector removal message and task status messages for the same 
> connector. If the connector is rapidly removed and recreated, the status 
> backing store may see the task running events before the connector removal, 
> in which case the running tasks would be removed from the tasks cache.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-13156) KafkaStatusBackingStore making incorrect assumption about order of task and connector delete events

2023-05-13 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-13156:
-

Assignee: Sagar Rao  (was: Chris Egerton)

> KafkaStatusBackingStore making incorrect assumption about order of task and 
> connector delete events
> ---
>
> Key: KAFKA-13156
> URL: https://issues.apache.org/jira/browse/KAFKA-13156
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Nigel Liang
>Assignee: Sagar Rao
>Priority: Minor
>
> When a connector remove message is read from the status topic, 
> `KafkaStatusBackingStore` removes both the connector and associated tasks 
> from its connector and tasks cache. However, due to potentially partitioned 
> nature of the status topic, there is no guarantee about the order of delivery 
> for the connector removal message and task status messages for the same 
> connector. If the connector is rapidly removed and recreated, the status 
> backing store may see the task running events before the connector removal, 
> in which case the running tasks would be removed from the tasks cache.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-14956) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted

2023-05-13 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-14956:
--
Labels: flaky-test  (was: )

> Flaky test 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted
> --
>
> Key: KAFKA-14956
> URL: https://issues.apache.org/jira/browse/KAFKA-14956
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Assignee: Yash Mayya
>Priority: Major
>  Labels: flaky-test
>
> ```
> h4. Error
> org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
> Sink connector consumer group offsets should catch up to the topic end 
> offsets ==> expected:  but was: 
> h4. Stacktrace
> org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
> Sink connector consumer group offsets should catch up to the topic end 
> offsets ==> expected:  but was: 
>  at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>  at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>  at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>  at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>  at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211)
>  at 
> app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337)
>  at 
> app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:291)
>  at 
> app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:150)
>  at 
> app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted(OffsetsApiIntegrationTest.java:131)
>  at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>  at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>  at 
> java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568)
>  at 
> app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>  at 
> app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>  at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>  at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>  at app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>  at app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>  at app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>  at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
>  at 
> 

[jira] [Updated] (KAFKA-14956) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted

2023-05-13 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-14956:
--
Component/s: KafkaConnect

> Flaky test 
> org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted
> --
>
> Key: KAFKA-14956
> URL: https://issues.apache.org/jira/browse/KAFKA-14956
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Assignee: Yash Mayya
>Priority: Major
>
> ```
> h4. Error
> org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
> Sink connector consumer group offsets should catch up to the topic end 
> offsets ==> expected:  but was: 
> h4. Stacktrace
> org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
> Sink connector consumer group offsets should catch up to the topic end 
> offsets ==> expected:  but was: 
>  at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
>  at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
>  at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
>  at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
>  at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211)
>  at 
> app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337)
>  at 
> app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318)
>  at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:291)
>  at 
> app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:150)
>  at 
> app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted(OffsetsApiIntegrationTest.java:131)
>  at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
>  at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
>  at 
> java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568)
>  at 
> app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>  at 
> app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>  at 
> app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>  at 
> app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>  at 
> app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>  at 
> app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>  at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>  at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>  at 
> app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>  at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>  at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>  at app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>  at app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>  at app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>  at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>  at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>  at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
>  at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
>  at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
>  at 
> 

[jira] [Updated] (KAFKA-14971) Flaky Test org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs

2023-05-13 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-14971:
--
Labels: flaky-test mirror-maker  (was: )

> Flaky Test 
> org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs
> --
>
> Key: KAFKA-14971
> URL: https://issues.apache.org/jira/browse/KAFKA-14971
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sagar Rao
>Priority: Major
>  Labels: flaky-test, mirror-maker
>
> The test testSyncTopicConfigs in 
> `org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs`
>  seems to be flaky. Found here : 
> [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13594/6/tests]
> Ran on local against the [same PR  
> |https://github.com/apache/kafka/pull/13594]and  it passed.
>  
>  
> {code:java}
> org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, 
> because it's explicitly defined on the target topic! ==> expected: <2000> but 
> was: <8640>
> Stacktrace
> org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, 
> because it's explicitly defined on the target topic! ==> expected: <2000> but 
> was: <8640>
> at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
> at 
> app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
> at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
> at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
> at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1153)
> at 
> app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.lambda$testSyncTopicConfigs$8(MirrorConnectorsIntegrationBaseTest.java:758)
> at 
> app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:325)
> at 
> app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:373)
> at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:322)
> at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:306)
> at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:296)
> at 
> app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testSyncTopicConfigs(MirrorConnectorsIntegrationBaseTest.java:752)
> at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method)
> at 
> java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
> at 
> java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568)
> at 
> app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
> at 
> app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
> at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
> at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
> at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
> at 
> app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
> at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
> at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
> at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
> at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
> at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
> at 
> app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
> at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
> at 
> app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
> at 
> app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217)
> at 
> 

[jira] [Created] (KAFKA-14997) JmxToolTest failing with initializationError

2023-05-13 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-14997:
-

 Summary: JmxToolTest failing with initializationError
 Key: KAFKA-14997
 URL: https://issues.apache.org/jira/browse/KAFKA-14997
 Project: Kafka
  Issue Type: Bug
Reporter: Sagar Rao
Assignee: Federico Valeri


Noticed that JmxToolTest fails with 

```
h4. Error
java.io.IOException: Cannot bind to URL [rmi://:44743/jmxrmi]: 
javax.naming.ServiceUnavailableException [Root exception is 
java.rmi.ConnectException: Connection refused to host: 40.117.157.99; nested 
exception is: 
 java.net.ConnectException: Connection timed out (Connection timed out)]
h4. Stacktrace
java.io.IOException: Cannot bind to URL [rmi://:44743/jmxrmi]: 
javax.naming.ServiceUnavailableException [Root exception is 
java.rmi.ConnectException: Connection refused to host: 40.117.157.99; nested 
exception is: 
 java.net.ConnectException: Connection timed out (Connection timed out)]
 at 
javax.management.remote.rmi.RMIConnectorServer.newIOException(RMIConnectorServer.java:827)
 at 
javax.management.remote.rmi.RMIConnectorServer.start(RMIConnectorServer.java:432)
 at org.apache.kafka.tools.JmxToolTest.startJmxAgent(JmxToolTest.java:337)
 at org.apache.kafka.tools.JmxToolTest.beforeAll(JmxToolTest.java:55)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
 at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
 at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
 at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptLifecycleMethod(TimeoutExtension.java:128)
 at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptBeforeAllMethod(TimeoutExtension.java:70)
 at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
 at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
 at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
 at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
 at 
org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeBeforeAllMethods$13(ClassBasedTestDescriptor.java:411)
 at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at 
org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeBeforeAllMethods(ClassBasedTestDescriptor.java:409)
 at 
org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:215)
 at 
org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:84)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:148)
 at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
 at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
 at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
 at java.util.ArrayList.forEach(ArrayList.java:1259)
 at 
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
 at 

[jira] [Updated] (KAFKA-14971) Flaky Test org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs

2023-05-07 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao updated KAFKA-14971:
--
Description: 
The test testSyncTopicConfigs in 
`org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs`
 seems to be flaky. Found here : 
[https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13594/6/tests]

Ran on local against the [same PR  
|https://github.com/apache/kafka/pull/13594]and  it passed.

 

 
{code:java}
org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, 
because it's explicitly defined on the target topic! ==> expected: <2000> but 
was: <8640>

Stacktrace
org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, 
because it's explicitly defined on the target topic! ==> expected: <2000> but 
was: <8640>
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1153)
at 
app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.lambda$testSyncTopicConfigs$8(MirrorConnectorsIntegrationBaseTest.java:758)
at 
app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:325)
at 
app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:373)
at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:322)
at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:306)
at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:296)
at 
app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testSyncTopicConfigs(MirrorConnectorsIntegrationBaseTest.java:752)
at 
java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
at 
java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at 
java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568)
at 
app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
at 
app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217)
at 
app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213)
at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138)
at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
at 
app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
at 
app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at 

[jira] [Created] (KAFKA-14971) Flaky Test org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs

2023-05-07 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-14971:
-

 Summary: Flaky Test 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs
 Key: KAFKA-14971
 URL: https://issues.apache.org/jira/browse/KAFKA-14971
 Project: Kafka
  Issue Type: Bug
Reporter: Sagar Rao


The test testSyncTopicConfigs in ` 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs`
 seems to be flaky. Found here : 
[https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13594/6/tests]

 

Ran on local against the [same PR  
|https://github.com/apache/kafka/pull/13594]and  it has passed.

 

```
h4. Error
org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, 
because it's explicitly defined on the target topic! ==> expected: <2000> but 
was: <8640>
h4. Stacktrace
org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, 
because it's explicitly defined on the target topic! ==> expected: <2000> but 
was: <8640>
 at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
 at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
 at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
 at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
 at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1153)
 at 
app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.lambda$testSyncTopicConfigs$8(MirrorConnectorsIntegrationBaseTest.java:758)
 at 
app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:325)
 at 
app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:373)
 at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:322)
 at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:306)
 at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:296)
 at 
app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testSyncTopicConfigs(MirrorConnectorsIntegrationBaseTest.java:752)
 at 
java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
 at 
java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
 at 
java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568)
 at 
app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
 at 
app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
 at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
 at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
 at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
 at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
 at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
 at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
 at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
 at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
 at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
 at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
 at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
 at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
 at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217)
 at 
app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213)
 at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138)
 at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
 at 

  1   2   3   4   5   >