[jira] [Resolved] (KAFKA-15041) Source Connector auto topic creation fails when topic is deleted and brokers don't support auto topic creation
[ https://issues.apache.org/jira/browse/KAFKA-15041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao resolved KAFKA-15041. --- Resolution: Won't Fix For now, setting the config `producer.override.max.block.ms` at a connector config level or `producer.max.block.ms` at a worker config level to a lower value should fix this value. The problem is that the default value for the above config is[ set to Long.MAX_VALUE |https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L820]in the configs and when topics are deleted manually, there's really no signal that is received to indicate the same. We could add some heuristics like checking if a topic is present or not periodically and refreshing the cache, or check the source topic metrics to see if the records are just being buffered and not being sent but that's outside the scope of runtime. > Source Connector auto topic creation fails when topic is deleted and brokers > don't support auto topic creation > -- > > Key: KAFKA-15041 > URL: https://issues.apache.org/jira/browse/KAFKA-15041 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > > [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics] > allows the source connectors to create topics even when the broker doesn't > allow to do so. It does so by checking for every record if a topic needs to > be created > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L500.] > To not always keep checking for topic presence via admin topics, it also > maintains a cache of the topics that it has created and doesn't create those > anymore. This helps to create topics when brokers don't support automatic > topic creation. > However, lets say the topic gets created initially and later on gets deleted > while the connector is still running and the brokers don't support automatic > topic creation. For such cases, the connector has cached the topic it has > already created and wouldn't recreate it because the cache never updates and > since the broker doesn't support topic creation, the logs would just be full > of messages like > > {code:java} > Error while fetching metadata with correlation id 3260 : > {connect-test=UNKNOWN_TOPIC_OR_PARTITION}{code} > > This can become a problem on environments where brokers don't allow topic > creation. We need a way to refresh the topics cache for such cases. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14401) Connector/Tasks reading offsets can get stuck if underneath WorkThread dies
[ https://issues.apache.org/jira/browse/KAFKA-14401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao updated KAFKA-14401: -- Description: When a connector or task tries to read the offsets from the offsets topic, it issues `OffsetStorageImpl#offsets` method. This method gets a Future from the underneath KafkaBackingStore. KafkaBackingStore invokes `KafkaBasedLog#readToEnd` method and passes the Callback. This method essentially adds the Callback to a Queue of callbacks that are being managed. Within KafkaBasedLog, there's a WorkThread which keeps polling over the callback queue and executes them and it does this in an infinite loop. However, there is an enclosing try/catch block around the while loop. If there's an exception thrown which is not caught by any of the other catch blocks, the control goes to the outermost catch block and the WorkThread is terminated. However, the connectors/tasks are not aware of this and they would keep submitting callbacks to KafkaBasedLog with nobody processing them. This can be seen in the thread dumps as well: {code:java} "task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s tid=0x7f8d9c037000 nid=0x5d00 waiting on condition [0x7f8dc08cd000] java.lang.Thread.State: WAITING (parking) at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method) - parking to wait for <0x00070345c9a8> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345) at java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232) at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98) at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101) at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63) {code} We need a mechanism to fail all such offset read requests. That is because even if we restart the thread, chances are it will still fail with the same error so the offset fetch would be stuck perennially. As already explained, this scenario happens mainly when the exception thrown is such that it isn't caught by any of the catch blocks and the control lands up in the outermost catch block. In my experience, I have seen this situation happening on a few occasions, when the exception thrown is: {code:java} [2022-11-20 09:00:59,307] ERROR Unexpected exception in Thread[KafkaBasedLog Work Thread - connect-offsets,5,main] (org.apache.kafka.connect.util.KafkaBasedLog:440)org.apache.kafka.connect.errors.ConnectException: Error while getting end offsets for topic 'connect-offsets' on brokers at XXX at org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:695) at org.apache.kafka.connect.util.KafkaBasedLog.readEndOffsets(KafkaBasedLog.java:371) at org.apache.kafka.connect.util.KafkaBasedLog.readToLogEnd(KafkaBasedLog.java:332) at org.apache.kafka.connect.util.KafkaBasedLog.access$400(KafkaBasedLog.java:75) at org.apache.kafka.connect.util.KafkaBasedLog$WorkThread.run(KafkaBasedLog.java:406) Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:672) ... 4 more {code} At this point, the WorkThread is dead once the control goes out of the [catch block|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L608-L610] and we can find the following line `Unexpected exception in` in the logs. Another example could be when the worker is already OOM and in such cases as well the work thread would die. This is not a good example because once the worker is OOM, we can't make any progress anyways but adding this example for brevity's sake. was: When a connector or task tries to read the offsets from the
[jira] [Updated] (KAFKA-14401) Connector/Tasks reading offsets can get stuck if underneath WorkThread dies
[ https://issues.apache.org/jira/browse/KAFKA-14401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao updated KAFKA-14401: -- Description: When a connector or task tries to read the offsets from the offsets topic, it issues `OffsetStorageImpl#offsets` method. This method gets a Future from the underneath KafkaBackingStore. KafkaBackingStore invokes `KafkaBasedLog#readToEnd` method and passes the Callback. This method essentially adds the Callback to a Queue of callbacks that are being managed. Within KafkaBasedLog, there's a WorkThread which keeps polling over the callback queue and executes them and it does this in an infinite loop. However, there is an enclosing try/catch block around the while loop. If there's an exception thrown which is not caught by any of the other catch blocks, the control goes to the outermost catch block and the WorkThread is terminated. However, the connectors/tasks are not aware of this and they would keep submitting callbacks to KafkaBasedLog with nobody processing them. This can be seen in the thread dumps as well: {code:java} "task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s tid=0x7f8d9c037000 nid=0x5d00 waiting on condition [0x7f8dc08cd000] java.lang.Thread.State: WAITING (parking) at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method) - parking to wait for <0x00070345c9a8> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345) at java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232) at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98) at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101) at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63) {code} We need a mechanism to fail all such offset read requests. That is because even if we restart the thread, chances are it will still fail with the same error so the offset fetch would be stuck perennially. As already explained, this scenario happens mainly when the exception thrown is such that it isn't caught by any of the catch blocks and the control lands up in the outermost catch block. In my experience, I have seen this situation happening on a few occasions, when the exception thrown is: {code:java} Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at org.apache.kafka.connect.util.TopicAdmin.endOffsets(TopicAdmin.java:672) ... 4 more {code} At this point, the WorkThread is dead once the control goes out of the [catch block|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java#L608-L610] and we can find the following line `Unexpected exception in` in the logs. Another example could be when the worker is already OOM and in such cases as well the work thread would die. This is not a good example because once the worker is OOM, we can't make any progress anyways but adding this example for brevity's sake. was: When a connector or task tries to read the offsets from the offsets topic, it issues `OffsetStorageImpl#offsets` method. This method gets a Future from the underneath KafkaBackingStore. KafkaBackingStore invokes `KafkaBasedLog#readToEnd` method and passes the Callback. This method essentially adds the Callback to a Queue of callbacks that are being managed. Within KafkaBasedLog, there's a WorkThread which keeps polling over the callback queue and executes them and it does this in an infinite loop. However, there is an enclosing try/catch block around the while loop. If there's an exception thrown which is not caught by any of the other catch blocks, the control goes to the outermost catch block and the WorkThread is terminated. However, the connectors/tasks are not aware of this and they would keep submitting
[jira] [Commented] (KAFKA-16604) Deprecate ConfigDef.ConfigKey constructor from public APIs
[ https://issues.apache.org/jira/browse/KAFKA-16604?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17840163#comment-17840163 ] Sagar Rao commented on KAFKA-16604: --- [~chia7712] , ok i have assigned it to myself. > Deprecate ConfigDef.ConfigKey constructor from public APIs > -- > > Key: KAFKA-16604 > URL: https://issues.apache.org/jira/browse/KAFKA-16604 > Project: Kafka > Issue Type: Improvement >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > > Currently, one can create ConfigKey by either invoking the public constructor > directly and passing it to a ConfigDef object or by invoking the a bunch of > define methods. The 2 ways can get confusing at times. Moreover, it could > lead to errors as was noticed in KAFKA-16592 > We should ideally have only 1 way exposed to the users which IMO should be to > create the objects only through the exposed define methods. This ticket is > about marking the public constructor of ConfigKey as Deprecated first and > then making it private eventually. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16604) Deprecate ConfigDef.ConfigKey constructor from public APIs
[ https://issues.apache.org/jira/browse/KAFKA-16604?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-16604: - Assignee: Sagar Rao > Deprecate ConfigDef.ConfigKey constructor from public APIs > -- > > Key: KAFKA-16604 > URL: https://issues.apache.org/jira/browse/KAFKA-16604 > Project: Kafka > Issue Type: Improvement >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > > Currently, one can create ConfigKey by either invoking the public constructor > directly and passing it to a ConfigDef object or by invoking the a bunch of > define methods. The 2 ways can get confusing at times. Moreover, it could > lead to errors as was noticed in KAFKA-16592 > We should ideally have only 1 way exposed to the users which IMO should be to > create the objects only through the exposed define methods. This ticket is > about marking the public constructor of ConfigKey as Deprecated first and > then making it private eventually. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16604) Deprecate ConfigDef.ConfigKey constructor from public APIs
Sagar Rao created KAFKA-16604: - Summary: Deprecate ConfigDef.ConfigKey constructor from public APIs Key: KAFKA-16604 URL: https://issues.apache.org/jira/browse/KAFKA-16604 Project: Kafka Issue Type: Improvement Reporter: Sagar Rao Currently, one can create ConfigKey by either invoking the public constructor directly and passing it to a ConfigDef object or by invoking the a bunch of define methods. The 2 ways can get confusing at times. Moreover, it could lead to errors as was noticed in KAFKA-16592 We should ideally have only 1 way exposed to the users which IMO should be to create the objects only through the exposed define methods. This ticket is about marking the public constructor of ConfigKey as Deprecated first and then making it private eventually. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16592) ConfigKey constructor update can break clients using it
Sagar Rao created KAFKA-16592: - Summary: ConfigKey constructor update can break clients using it Key: KAFKA-16592 URL: https://issues.apache.org/jira/browse/KAFKA-16592 Project: Kafka Issue Type: Bug Reporter: Sagar Rao Assignee: Sagar Rao In [KAFKA-14957|https://issues.apache.org/jira/browse/KAFKA-14957], the constructor of ConfigDef.ConfigKey was updated to add a new argument called {*}alternativeString{*}. As part of the PR, new *define* methods were also added which makes sense. However, since the constructor of *ConfigDef.ConfigKey* itself can be used directly by other clients which import the dependency, this can break all clients who were using the older constructor w/o the *alternativeString* argument. I bumped into this when I was testing the[kafka-connect-redis|[https://github.com/jcustenborder/kafka-connect-redis/tree/master]] connector. It starts up correctly against the official 3.7 release, but fails with the following error when run against a 3.8 snapshot {code:java} Caused by: java.lang.NoSuchMethodError: org.apache.kafka.common.config.ConfigDef$ConfigKey.(Ljava/lang/String;Lorg/apache/kafka/common/config/ConfigDef$Type;Ljava/lang/Object;Lorg/apache/kafka/common/config/ConfigDef$Validator;Lorg/apache/kafka/common/config/ConfigDef$Importance;Ljava/lang/String;Ljava/lang/String;ILorg/apache/kafka/common/config/ConfigDef$Width;Ljava/lang/String;Ljava/util/List;Lorg/apache/kafka/common/config/ConfigDef$Recommender;Z)V at com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder.build(ConfigKeyBuilder.java:62) at com.github.jcustenborder.kafka.connect.redis.RedisConnectorConfig.config(RedisConnectorConfig.java:133) at com.github.jcustenborder.kafka.connect.redis.RedisSinkConnectorConfig.config(RedisSinkConnectorConfig.java:46) at com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector.config(RedisSinkConnector.java:73) at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:538) at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$3(AbstractHerder.java:412) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more {code} The reason for that is that the connector uses another library called connect-utils which invokes the old constructor [directly|https://github.com/jcustenborder/connect-utils/blob/master/connect-utils/src/main/java/com/github/jcustenborder/kafka/connect/utils/config/ConfigKeyBuilder.java#L62] It is not expected for connector invocations to fail across versions so this would cause confusion. We could argue that why is the constructor being invoked directly instead of using the *define* method, but there might be other clients doing the same. We should add the old constructor back which calls the new one by setting the *alternativeString* to null. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16578) Revert changes to connect_distributed_test.py for the new async Consumer
[ https://issues.apache.org/jira/browse/KAFKA-16578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838609#comment-17838609 ] Sagar Rao edited comment on KAFKA-16578 at 4/18/24 3:02 PM: [~kirktrue], I am running the the system tests for connect using the test suite on my local, and this exact test *test_exactly_once_source* fails regularly for me for a different config. {code:java} test_id: kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 7 minutes 59.232 seconds InsufficientResourcesError('linux nodes requested: 1. linux nodes available: 0') Traceback (most recent call last): File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 186, in _do_run data = self.run_test() File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 246, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", line 929, in test_exactly_once_source consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True, consumer_properties=consumer_properties) File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, in __init__ BackgroundThreadService.__init__(self, context, num_nodes) File "/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py", line 26, in __init__ super(BackgroundThreadService, self).__init__(context, num_nodes, cluster_spec, *args, **kwargs) File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", line 107, in __init__ self.allocate_nodes() File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", line 217, in allocate_nodes self.nodes = self.cluster.alloc(self.cluster_spec) File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/cluster.py", line 54, in alloc allocated = self.do_alloc(cluster_spec) File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/finite_subcluster.py", line 37, in do_alloc good_nodes, bad_nodes = self._available_nodes.remove_spec(cluster_spec) File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/node_container.py", line 131, in remove_spec raise InsufficientResourcesError(err) ducktape.cluster.node_container.InsufficientResourcesError: linux nodes requested: 1. linux nodes available: 0 {code} -If you want, I can revert the change as part of my PR. It should be ready for review by today or tomorrow.- I updated the PR [https://github.com/apache/kafka/pull/15594] with the changes rolled back for the test in question. I hope it's ok. was (Author: sagarrao): [~kirktrue], I am running the the system tests for connect using the test suite on my local, and this exact test *test_exactly_once_source* fails regularly for me for a different config. {code:java} test_id: kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 7 minutes 59.232 seconds InsufficientResourcesError('linux nodes requested: 1. linux nodes available: 0') Traceback (most recent call last): File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 186, in _do_run data = self.run_test() File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 246, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", line 929, in test_exactly_once_source consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True, consumer_properties=consumer_properties) File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, in __init__ BackgroundThreadService.__init__(self, context, num_nodes) File "/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py", line 26, in __init__ super(BackgroundThreadService,
[jira] [Comment Edited] (KAFKA-16578) Revert changes to connect_distributed_test.py for the new async Consumer
[ https://issues.apache.org/jira/browse/KAFKA-16578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838609#comment-17838609 ] Sagar Rao edited comment on KAFKA-16578 at 4/18/24 11:11 AM: - [~kirktrue], I am running the the system tests for connect using the test suite on my local, and this exact test *test_exactly_once_source* fails regularly for me for a different config. {code:java} test_id: kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 7 minutes 59.232 seconds InsufficientResourcesError('linux nodes requested: 1. linux nodes available: 0') Traceback (most recent call last): File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 186, in _do_run data = self.run_test() File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 246, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", line 929, in test_exactly_once_source consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True, consumer_properties=consumer_properties) File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, in __init__ BackgroundThreadService.__init__(self, context, num_nodes) File "/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py", line 26, in __init__ super(BackgroundThreadService, self).__init__(context, num_nodes, cluster_spec, *args, **kwargs) File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", line 107, in __init__ self.allocate_nodes() File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", line 217, in allocate_nodes self.nodes = self.cluster.alloc(self.cluster_spec) File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/cluster.py", line 54, in alloc allocated = self.do_alloc(cluster_spec) File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/finite_subcluster.py", line 37, in do_alloc good_nodes, bad_nodes = self._available_nodes.remove_spec(cluster_spec) File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/node_container.py", line 131, in remove_spec raise InsufficientResourcesError(err) ducktape.cluster.node_container.InsufficientResourcesError: linux nodes requested: 1. linux nodes available: 0 {code} If you want, I can revert the change as part of my PR. It should be ready for review by today or tomorrow. was (Author: sagarrao): [~kirktrue], I am working on modifying the the system tests for connect, and this exact test *test_exactly_once_source* fails regularly for me for a different config. {code:java} test_id: kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 7 minutes 59.232 seconds InsufficientResourcesError('linux nodes requested: 1. linux nodes available: 0') Traceback (most recent call last): File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 186, in _do_run data = self.run_test() File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 246, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", line 929, in test_exactly_once_source consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True, consumer_properties=consumer_properties) File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, in __init__ BackgroundThreadService.__init__(self, context, num_nodes) File "/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py", line 26, in __init__ super(BackgroundThreadService, self).__init__(context, num_nodes, cluster_spec, *args, **kwargs) File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", line 107, in
[jira] [Comment Edited] (KAFKA-16578) Revert changes to connect_distributed_test.py for the new async Consumer
[ https://issues.apache.org/jira/browse/KAFKA-16578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838609#comment-17838609 ] Sagar Rao edited comment on KAFKA-16578 at 4/18/24 11:10 AM: - [~kirktrue], I am working on modifying the the system tests for connect, and this exact test *test_exactly_once_source* fails regularly for me for a different config. {code:java} test_id: kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 7 minutes 59.232 seconds InsufficientResourcesError('linux nodes requested: 1. linux nodes available: 0') Traceback (most recent call last): File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 186, in _do_run data = self.run_test() File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 246, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", line 929, in test_exactly_once_source consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True, consumer_properties=consumer_properties) File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, in __init__ BackgroundThreadService.__init__(self, context, num_nodes) File "/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py", line 26, in __init__ super(BackgroundThreadService, self).__init__(context, num_nodes, cluster_spec, *args, **kwargs) File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", line 107, in __init__ self.allocate_nodes() File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", line 217, in allocate_nodes self.nodes = self.cluster.alloc(self.cluster_spec) File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/cluster.py", line 54, in alloc allocated = self.do_alloc(cluster_spec) File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/finite_subcluster.py", line 37, in do_alloc good_nodes, bad_nodes = self._available_nodes.remove_spec(cluster_spec) File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/node_container.py", line 131, in remove_spec raise InsufficientResourcesError(err) ducktape.cluster.node_container.InsufficientResourcesError: linux nodes requested: 1. linux nodes available: 0 {code} If you want, I can revert the change as part of my PR. It should be ready for review by today or tomorrow. was (Author: sagarrao): [~kirktrue], I am working on modifying the the system tests for connect, and this exact test `test_exactly_once_source` fails regularly for me for a different config. ``` test_id: kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 7 minutes 59.232 seconds InsufficientResourcesError('linux nodes requested: 1. linux nodes available: 0') Traceback (most recent call last): File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 186, in _do_run data = self.run_test() File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 246, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", line 929, in test_exactly_once_source consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True, consumer_properties=consumer_properties) File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, in __init__ BackgroundThreadService.__init__(self, context, num_nodes) File "/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py", line 26, in __init__ super(BackgroundThreadService, self).__init__(context, num_nodes, cluster_spec, *args, **kwargs) File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", line 107, in __init__
[jira] [Commented] (KAFKA-16578) Revert changes to connect_distributed_test.py for the new async Consumer
[ https://issues.apache.org/jira/browse/KAFKA-16578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17838609#comment-17838609 ] Sagar Rao commented on KAFKA-16578: --- [~kirktrue], I am working on modifying the the system tests for connect, and this exact test `test_exactly_once_source` fails regularly for me for a different config. ``` test_id: kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=sessioned.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer status: FAIL run time: 7 minutes 59.232 seconds InsufficientResourcesError('linux nodes requested: 1. linux nodes available: 0') Traceback (most recent call last): File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 186, in _do_run data = self.run_test() File "/usr/local/lib/python3.9/dist-packages/ducktape/tests/runner_client.py", line 246, in run_test return self.test_context.function(self.test) File "/usr/local/lib/python3.9/dist-packages/ducktape/mark/_mark.py", line 433, in wrapper return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) File "/opt/kafka-dev/tests/kafkatest/tests/connect/connect_distributed_test.py", line 929, in test_exactly_once_source consumer_validator = ConsoleConsumer(self.test_context, 1, self.kafka, self.source.topic, consumer_timeout_ms=1000, print_key=True, consumer_properties=consumer_properties) File "/opt/kafka-dev/tests/kafkatest/services/console_consumer.py", line 97, in __init__ BackgroundThreadService.__init__(self, context, num_nodes) File "/usr/local/lib/python3.9/dist-packages/ducktape/services/background_thread.py", line 26, in __init__ super(BackgroundThreadService, self).__init__(context, num_nodes, cluster_spec, *args, **kwargs) File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", line 107, in __init__ self.allocate_nodes() File "/usr/local/lib/python3.9/dist-packages/ducktape/services/service.py", line 217, in allocate_nodes self.nodes = self.cluster.alloc(self.cluster_spec) File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/cluster.py", line 54, in alloc allocated = self.do_alloc(cluster_spec) File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/finite_subcluster.py", line 37, in do_alloc good_nodes, bad_nodes = self._available_nodes.remove_spec(cluster_spec) File "/usr/local/lib/python3.9/dist-packages/ducktape/cluster/node_container.py", line 131, in remove_spec raise InsufficientResourcesError(err) ducktape.cluster.node_container.InsufficientResourcesError: linux nodes requested: 1. linux nodes available: 0 ``` If you want, I can revert the change as part of my PR. It should be ready for review by today or tomorrow. > Revert changes to connect_distributed_test.py for the new async Consumer > > > Key: KAFKA-16578 > URL: https://issues.apache.org/jira/browse/KAFKA-16578 > Project: Kafka > Issue Type: Task > Components: clients, consumer, system tests >Affects Versions: 3.8.0 >Reporter: Kirk True >Assignee: Kirk True >Priority: Critical > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > To test the new, asynchronous Kafka {{Consumer}} implementation, we migrated > a slew of system tests to run both the "old" and "new" implementations. > KAFKA-16272 updated the system tests in {{connect_distributed_test.py}} so it > could test the new consumer with Connect. However, we are not supporting > Connect with the new consumer in 3.8.0. Unsurprisingly, when we run the > Connect system tests with the new {{AsyncKafkaConsumer}}, we get errors like > the following: > {code} > test_id: > kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_exactly_once_source.clean=False.connect_protocol=eager.metadata_quorum=ISOLATED_KRAFT.use_new_coordinator=True.group_protocol=consumer > status: FAIL > run time: 6 minutes 3.899 seconds > InsufficientResourcesError('Not enough nodes available to allocate. linux > nodes requested: 1. linux nodes available: 0') > Traceback (most recent call last): > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 184, in _do_run > data = self.run_test() > File > "/home/jenkins/workspace/system-test-kafka-branch-builder/kafka/venv/lib/python3.7/site-packages/ducktape/tests/runner_client.py", > line 262, in run_test > return self.test_context.function(self.test) > File >
[jira] [Assigned] (KAFKA-16481) Fix flaky kafka.server.ReplicaManagerTest#testRemoteLogReaderMetrics
[ https://issues.apache.org/jira/browse/KAFKA-16481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-16481: - Assignee: Sagar Rao > Fix flaky kafka.server.ReplicaManagerTest#testRemoteLogReaderMetrics > > > Key: KAFKA-16481 > URL: https://issues.apache.org/jira/browse/KAFKA-16481 > Project: Kafka > Issue Type: Test >Reporter: Chia-Ping Tsai >Assignee: Sagar Rao >Priority: Minor > > {quote} > org.opentest4j.AssertionFailedError: expected: but was: > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31) > at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:183) > at > app//kafka.server.ReplicaManagerTest.testRemoteLogReaderMetrics(ReplicaManagerTest.scala:4089) > at > java.base@21.0.2/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103) > at java.base@21.0.2/java.lang.reflect.Method.invoke(Method.java:580) > at > app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:728) > at > app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:218) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:214) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:139) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:69) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) > at > app//org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) > at > app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) > at > app//org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) > at
[jira] [Commented] (KAFKA-16272) Update connect_distributed_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17832894#comment-17832894 ] Sagar Rao commented on KAFKA-16272: --- [~kirktrue], thanks I chnged the status to In Progress > Update connect_distributed_test.py to support KIP-848’s group protocol config > - > > Key: KAFKA-16272 > URL: https://issues.apache.org/jira/browse/KAFKA-16272 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Sagar Rao >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{connect_distributed_test.py}} > to support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-16272) Update connect_distributed_test.py to support KIP-848’s group protocol config
[ https://issues.apache.org/jira/browse/KAFKA-16272?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-16272: - Assignee: Sagar Rao > Update connect_distributed_test.py to support KIP-848’s group protocol config > - > > Key: KAFKA-16272 > URL: https://issues.apache.org/jira/browse/KAFKA-16272 > Project: Kafka > Issue Type: Test > Components: clients, consumer, system tests >Affects Versions: 3.7.0 >Reporter: Kirk True >Assignee: Sagar Rao >Priority: Blocker > Labels: kip-848-client-support, system-tests > Fix For: 3.8.0 > > > This task is to update the test method(s) in {{connect_distributed_test.py}} > to support the {{group.protocol}} configuration introduced in > [KIP-848|https://cwiki.apache.org/confluence/display/KAFKA/KIP-848%3A+The+Next+Generation+of+the+Consumer+Rebalance+Protocol] > by adding an optional {{group_protocol}} argument to the tests and matrixes. > See KAFKA-16231 as an example of how the test parameters can be changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16197) Connect Worker poll timeout prints Consumer poll timeout specific warnings.
[ https://issues.apache.org/jira/browse/KAFKA-16197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao updated KAFKA-16197: -- Description: When a Connect worker's poll timeout expires in Connect, the log lines that we see are: {noformat} consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. {noformat} and the reason for leaving the group is {noformat} Member XX sending LeaveGroup request to coordinator XX due to consumer poll timeout has expired. {noformat} which is specific to Consumers and not to Connect workers. The log line above in specially misleading because the config `max.poll.interval.ms` is not configurable for a Connect worker and could make someone believe that the logs are being written for Sink Connectors and not for Connect worker. Ideally, we should print something specific to Connect. was: When a worker's poll timeout expires in Connect, the log lines that we see are: {noformat} consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. {noformat} and the reason for leaving the group is {noformat} Member XX sending LeaveGroup request to coordinator XX due to consumer poll timeout has expired. {noformat} which is specific to Consumers and not to Connect workers. The log line above in specially misleading because the config `max.poll.interval.ms` is not configurable for a Connect worker and could make someone believe that the logs are being written for Sink Connectors and not for Connect worker. Ideally, we should print something specific to Connect. > Connect Worker poll timeout prints Consumer poll timeout specific warnings. > --- > > Key: KAFKA-16197 > URL: https://issues.apache.org/jira/browse/KAFKA-16197 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > > When a Connect worker's poll timeout expires in Connect, the log lines that > we see are: > {noformat} > consumer poll timeout has expired. This means the time between subsequent > calls to poll() was longer than the configured max.poll.interval.ms, which > typically implies that the poll loop is spending too much time processing > messages. You can address this either by increasing max.poll.interval.ms or > by reducing the maximum size of batches returned in poll() with > max.poll.records. > {noformat} > and the reason for leaving the group is > {noformat} > Member XX sending LeaveGroup request to coordinator XX due to consumer poll > timeout has expired. > {noformat} > which is specific to Consumers and not to Connect workers. The log line above > in specially misleading because the config `max.poll.interval.ms` is not > configurable for a Connect worker and could make someone believe that the > logs are being written for Sink Connectors and not for Connect worker. > Ideally, we should print something specific to Connect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16197) Connect Worker poll timeout prints Consumer poll timeout specific warnings.
[ https://issues.apache.org/jira/browse/KAFKA-16197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao updated KAFKA-16197: -- Component/s: connect > Connect Worker poll timeout prints Consumer poll timeout specific warnings. > --- > > Key: KAFKA-16197 > URL: https://issues.apache.org/jira/browse/KAFKA-16197 > Project: Kafka > Issue Type: Bug > Components: connect >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > > When a worker's poll timeout expires in Connect, the log lines that we see > are: > {noformat} > consumer poll timeout has expired. This means the time between subsequent > calls to poll() was longer than the configured max.poll.interval.ms, which > typically implies that the poll loop is spending too much time processing > messages. You can address this either by increasing max.poll.interval.ms or > by reducing the maximum size of batches returned in poll() with > max.poll.records. > {noformat} > and the reason for leaving the group is > {noformat} > Member XX sending LeaveGroup request to coordinator XX due to consumer poll > timeout has expired. > {noformat} > which is specific to Consumers and not to Connect workers. The log line above > in specially misleading because the config `max.poll.interval.ms` is not > configurable for a Connect worker and could make someone believe that the > logs are being written for Sink Connectors and not for Connect worker. > Ideally, we should print something specific to Connect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16197) Connect Worker poll timeout prints Consumer poll timeout specific warnings.
Sagar Rao created KAFKA-16197: - Summary: Connect Worker poll timeout prints Consumer poll timeout specific warnings. Key: KAFKA-16197 URL: https://issues.apache.org/jira/browse/KAFKA-16197 Project: Kafka Issue Type: Bug Reporter: Sagar Rao Assignee: Sagar Rao When a worker's poll timeout expires in Connect, the log lines that we see are: {noformat} consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. {noformat} and the reason for leaving the group is {noformat} Member XX sending LeaveGroup request to coordinator XX due to consumer poll timeout has expired. {noformat} which is specific to Consumers and not to Connect workers. The log line above in specially misleading because the config `max.poll.interval.ms` is not configurable for a Connect worker and could make someone believe that the logs are being written for Sink Connectors and not for Connect worker. Ideally, we should print something specific to Connect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16039) RecordHeaders supports the addAll method
[ https://issues.apache.org/jira/browse/KAFKA-16039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao updated KAFKA-16039: -- Labels: need-kip (was: ) > RecordHeaders supports the addAll method > > > Key: KAFKA-16039 > URL: https://issues.apache.org/jira/browse/KAFKA-16039 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Jianbin Chen >Priority: Minor > Labels: need-kip > > Why not provide an addAll method in RecordHeaders? This will help reduce the > amount of code required to copy between headers -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16056) Worker poll timeout expiry can lead to Duplicate task assignments.
Sagar Rao created KAFKA-16056: - Summary: Worker poll timeout expiry can lead to Duplicate task assignments. Key: KAFKA-16056 URL: https://issues.apache.org/jira/browse/KAFKA-16056 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Sagar Rao Assignee: Sagar Rao When a poll timeout expiry happens for a worker, it triggers a rebalance because it leaves the group pro-actively. Under normal scenarios, this leaving the group would trigger a scheduled rebalance delay. However, one thing to note is that, the worker which left the group temporarily, doesn't give up it's assignments and whatever tasks were running on it would remain as is. When the scheduled rebalance delay elapses, it would just get back it's assignments but given that there won't be any revocations, it should all work out fine. But there is an edge case here. Let's assume that a scheduled rebalance delay was already active on a group and just before a follow up rebalance due to scheduled rebalance elapsing, one of the worker's poll timeout expires. At this point, a rebalance is imminent and the leader would track the assignments of the transiently departed worker as lost [here|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L255] . When [handleLostAssignments|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L441] gets triggered, because the scheduledRebalance delay isn't reset yet and if [this|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L473] condition passes, the leader would assume that it needs to reassign all the lost assignments which it will. But because, the worker for which the poll timeout expired, doesn't rescind it's assignments we would end up noticing duplicate assignments- one set on the original worker which was already running the tasks and connectors and another set on the remaining group of workers which got the redistributed work. This could lead to task failures if connector has been written in a way which expects no duplicate tasks running across a cluster. Also, this edge case can be encountered more frequently if the `rebalance.timeout.ms` config is set to a lower value. One of the approaches could be to do something similar to https://issues.apache.org/jira/browse/KAFKA-9184 where upon coordinator discovery failure, the worker gives up all it's assignments and joins with an empty assignment. We could do something similar in this case as well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14454) KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions passes when run individ
[ https://issues.apache.org/jira/browse/KAFKA-14454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796108#comment-17796108 ] Sagar Rao commented on KAFKA-14454: --- Aah ok, no problem at all :) Good to know that it worked and that we don't need to reopen this ticket. > KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions > passes when run individually but not when is run as part of the IT > -- > > Key: KAFKA-14454 > URL: https://issues.apache.org/jira/browse/KAFKA-14454 > Project: Kafka > Issue Type: Bug >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > > Newly added test > KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions > as part of KIP-837 passes when run individually but fails when is part of IT > class and hence is marked as Ignored. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14454) KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions passes when run individ
[ https://issues.apache.org/jira/browse/KAFKA-14454?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17796095#comment-17796095 ] Sagar Rao commented on KAFKA-14454: --- [~cadonna], I don't recall the error and I tried to find the error but couldn't find it. I should have added the actual error as part of this ticket. We could reopen this but i didn't see this test failing on Jenkins as well. > KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions > passes when run individually but not when is run as part of the IT > -- > > Key: KAFKA-14454 > URL: https://issues.apache.org/jira/browse/KAFKA-14454 > Project: Kafka > Issue Type: Bug >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > > Newly added test > KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions > as part of KIP-837 passes when run individually but fails when is part of IT > class and hence is marked as Ignored. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-10873) Inaccurate "Ignoring stop request for unowned connector" log messages
[ https://issues.apache.org/jira/browse/KAFKA-10873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17787561#comment-17787561 ] Sagar Rao commented on KAFKA-10873: --- FWIW, we hit a scenario today where the same sequence of logs keep getting emitted. > Inaccurate "Ignoring stop request for unowned connector" log messages > - > > Key: KAFKA-10873 > URL: https://issues.apache.org/jira/browse/KAFKA-10873 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Chris Egerton >Priority: Major > > If a connector fails during startup, it will never be added to the worker's > internal list of running connectors (see > [https://github.com/apache/kafka/blob/87260a33b01590d0f73577840422e24fa589bed0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L298|https://github.com/apache/kafka/blob/87260a33b01590d0f73577840422e24fa589bed0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L298).]), > and the same happens for tasks (see > [https://github.com/apache/kafka/blob/87260a33b01590d0f73577840422e24fa589bed0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L570]). > > This leads to the following {{WARN}}-level log messages when that > connector/task is scheduled to be stopped by the worker: > * [Ignoring stop request for unowned connector > |https://github.com/apache/kafka/blob/87260a33b01590d0f73577840422e24fa589bed0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L390] > * [Ignoring await stop request for non-present connector > |https://github.com/apache/kafka/blob/87260a33b01590d0f73577840422e24fa589bed0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L415] > * [Ignoring stop request for unowned task > |https://github.com/apache/kafka/blob/87260a33b01590d0f73577840422e24fa589bed0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L832] > * [Ignoring await stop request for non-present task > |https://github.com/apache/kafka/blob/87260a33b01590d0f73577840422e24fa589bed0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L862] > If the connector/task failed on startup, there should already be log messages > detailing the failure and its cause; there is no need to emit warning > messages about not stopping that connector when it is scheduled for shutdown. > Even worse, emitting these messages may cause users to believe that their > cluster is experiencing a rebalancing bug that is somehow causing > connectors/tasks that are not assigned to a worker to be revoked from it. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15770) org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy is flaky
[ https://issues.apache.org/jira/browse/KAFKA-15770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786693#comment-17786693 ] Sagar Rao commented on KAFKA-15770: --- Noticed an instance here: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14777/1/tests > org.apache.kafka.streams.integration.ConsistencyVectorIntegrationTest.shouldHaveSamePositionBoundActiveAndStandBy > is flaky > --- > > Key: KAFKA-15770 > URL: https://issues.apache.org/jira/browse/KAFKA-15770 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Reporter: Alok Thatikunta >Priority: Major > > Test fails on CI, passes locally > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-14607/9/testReport/junit/org.apache.kafka.streams.integration/ConsistencyVectorIntegrationTest/Build___JDK_17_and_Scala_2_13___shouldHaveSamePositionBoundActiveAndStandBy/] > {code:java} > java.lang.AssertionError: > Result:SucceededQueryResult{result=<0,1698511250443>, executionInfo=[], > position=Position{position={input-topic={0=50 > Expected: is > but: was {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15687) Update host address for the GoupMetadata when replace static members
[ https://issues.apache.org/jira/browse/KAFKA-15687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779863#comment-17779863 ] Sagar Rao commented on KAFKA-15687: --- Thanks for filing this bug [~LucentWong]. You have pointed to the correct piece of code and we aren't indeed updating the clientId. While it does seem like a confusing thing to see the same client-id even when a static member gets replaced, I believe the member id does changed upon replacement. Do you think that is sufficient for this case? Or does the client id hold any particular importance in your usecase and it is mandatory to change it? > Update host address for the GoupMetadata when replace static members > > > Key: KAFKA-15687 > URL: https://issues.apache.org/jira/browse/KAFKA-15687 > Project: Kafka > Issue Type: Improvement > Components: group-coordinator >Affects Versions: 3.6.0 >Reporter: Yu Wang >Priority: Major > > We are trying to use static membership protocol for our consumers in > Kubernetes. When our pod was recreated, we found that the host address in the > group description will not change to the address of the new created pod. > For example we have one pod with *group.instance.id = id1 and ip = > 192.168.0.1* when the pod crashes, we will replace it with a new pod with > same *group.instance.id = id1* but a different {*}ip = 192.168.0.2{*}. After > the new pod joined in the consumer group, with the command "describe group", > we found the host is still {*}192.168.0.1{*}. This makes us cannot find > correct consumer instance when check the issue. > After read the source code, we found that the groupCoordinator will not > change the host address for the same {*}groupInstanceId{*}. > [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L316] > Is it also possible to replace the host address when replace the static > member? > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with "{statestore.cache}/{input.buffer}.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779839#comment-17779839 ] Sagar Rao commented on KAFKA-13152: --- Thanks [~ableegoldman], there are merge conflicts which need resolution. I will ping you once those are resolved. It being 3rd in your pecking order of KIPs is fine with me. And yes, the config related changes have been merged and released. It's only the actual buffering part. > Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with > "{statestore.cache}/{input.buffer}.max.bytes" > - > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: kip > Fix For: 3.7.0 > > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-13976) Improvements for OpenAPI specs
[ https://issues.apache.org/jira/browse/KAFKA-13976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-13976: - Assignee: (was: Sagar Rao) > Improvements for OpenAPI specs > -- > > Key: KAFKA-13976 > URL: https://issues.apache.org/jira/browse/KAFKA-13976 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Mickael Maison >Priority: Major > > Via KAFKA-13780, we're adding support for generating an OpenAPI spec for the > Connect REST API. > To improve the spec we should: > - Avoid returning Response and instead have types with a structure > - Document all response codes -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15237) Implement write operation timeout
[ https://issues.apache.org/jira/browse/KAFKA-15237?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-15237: - Assignee: Sagar Rao (was: David Jacot) > Implement write operation timeout > - > > Key: KAFKA-15237 > URL: https://issues.apache.org/jira/browse/KAFKA-15237 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Sagar Rao >Priority: Major > > In the scala code, we rely on `offsets.commit.timeout.ms` to bound all the > writes. We should do the same in the new code. This is important to ensure > that the number of pending response in the purgatory is bound. The name of > the config is not ideal but we should keep it for backward compatibility > reasons. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15454) Add support for OffsetCommit version 9 in admin client
[ https://issues.apache.org/jira/browse/KAFKA-15454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-15454: - Assignee: Sagar Rao > Add support for OffsetCommit version 9 in admin client > -- > > Key: KAFKA-15454 > URL: https://issues.apache.org/jira/browse/KAFKA-15454 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: David Jacot >Assignee: Sagar Rao >Priority: Minor > Labels: kip-848, kip-848-client-support, kip-848-preview > > We need to handle the new error codes as specified here: > [https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/OffsetCommitResponse.json#L46|https://github.com/apache/kafka/blob/trunk/clients/src/main/resources/common/message/OffsetCommitRequest.json#L35] -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15457) Add support for OffsetFetch version 9 in admin
[ https://issues.apache.org/jira/browse/KAFKA-15457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-15457: - Assignee: Sagar Rao > Add support for OffsetFetch version 9 in admin > -- > > Key: KAFKA-15457 > URL: https://issues.apache.org/jira/browse/KAFKA-15457 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: David Jacot >Assignee: Sagar Rao >Priority: Minor > Labels: kip-848, kip-848-client-support, kip-848-preview > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with "{statestore.cache}/{input.buffer}.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775716#comment-17775716 ] Sagar Rao commented on KAFKA-13152: --- Makes sense Stanislav! If we can have experts from Streams to review this, I would be more than happy to revive this. > Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with > "{statestore.cache}/{input.buffer}.max.bytes" > - > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: kip > Fix For: 3.7.0 > > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13152) Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with "{statestore.cache}/{input.buffer}.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775701#comment-17775701 ] Sagar Rao commented on KAFKA-13152: --- [~enether], that's correct.. the reviews are pending for this. I saw the PR and there are merge conflicts as well. Will need to resolve those.. > Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with > "{statestore.cache}/{input.buffer}.max.bytes" > - > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: kip > Fix For: 3.7.0 > > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-13152) Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with "{statestore.cache}/{input.buffer}.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17775701#comment-17775701 ] Sagar Rao edited comment on KAFKA-13152 at 10/16/23 11:15 AM: -- [~enether], that's correct.. the reviews are pending for this. I saw the PR and there are merge conflicts as well. I Will need to resolve those.. was (Author: sagarrao): [~enether], that's correct.. the reviews are pending for this. I saw the PR and there are merge conflicts as well. Will need to resolve those.. > Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with > "{statestore.cache}/{input.buffer}.max.bytes" > - > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: kip > Fix For: 3.7.0 > > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15020) integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor test is flaky
[ https://issues.apache.org/jira/browse/KAFKA-15020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17772105#comment-17772105 ] Sagar Rao commented on KAFKA-15020: --- I found the original stacktrace is showing up in one of the recent builds: https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-14432/8/tests/ > integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor > test is flaky > -- > > Key: KAFKA-15020 > URL: https://issues.apache.org/jira/browse/KAFKA-15020 > Project: Kafka > Issue Type: Test > Components: unit tests >Reporter: Atul Sharma >Priority: Major > Labels: flaky-test > > Sometimes the test fails with the following log: > {code:java} > Gradle Test Run :core:integrationTest > Gradle Test Executor 175 > > FetchFromFollowerIntegrationTest > testRackAwareRangeAssignor() FAILED > org.opentest4j.AssertionFailedError: Consumed 0 records before timeout > instead of the expected 2 records > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) > at org.junit.jupiter.api.Assertions.fail(Assertions.java:135) > at kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:1087) > at > integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$11(FetchFromFollowerIntegrationTest.scala:216) > at > integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$11$adapted(FetchFromFollowerIntegrationTest.scala:215) > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > at > integration.kafka.server.FetchFromFollowerIntegrationTest.verifyAssignments$1(FetchFromFollowerIntegrationTest.scala:215) > at > integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor(FetchFromFollowerIntegrationTest.scala:244) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14516) Implement static membeship
[ https://issues.apache.org/jira/browse/KAFKA-14516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-14516: - Assignee: Sagar Rao (was: David Jacot) > Implement static membeship > -- > > Key: KAFKA-14516 > URL: https://issues.apache.org/jira/browse/KAFKA-14516 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Sagar Rao >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14516) Implement static membeship
[ https://issues.apache.org/jira/browse/KAFKA-14516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17767939#comment-17767939 ] Sagar Rao commented on KAFKA-14516: --- [~dajac], sure , thank you very much! > Implement static membeship > -- > > Key: KAFKA-14516 > URL: https://issues.apache.org/jira/browse/KAFKA-14516 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14504) Implement DescribeGroups API
[ https://issues.apache.org/jira/browse/KAFKA-14504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17767925#comment-17767925 ] Sagar Rao commented on KAFKA-14504: --- Sure, I would be happy to help :) > Implement DescribeGroups API > > > Key: KAFKA-14504 > URL: https://issues.apache.org/jira/browse/KAFKA-14504 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: Dongnuo Lyu >Priority: Major > > Implement DescribeGroups API in the Group Coordinator. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-14504) Implement DescribeGroups API
[ https://issues.apache.org/jira/browse/KAFKA-14504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17767920#comment-17767920 ] Sagar Rao edited comment on KAFKA-14504 at 9/22/23 9:23 AM: [~dajac], is it still too urgent? I can take a stab at it if not and it's not already being worked upon. was (Author: sagarrao): [~dajac], is it still too urgent? I can take a stab at it if it's not already being worked upon. > Implement DescribeGroups API > > > Key: KAFKA-14504 > URL: https://issues.apache.org/jira/browse/KAFKA-14504 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Priority: Major > > Implement DescribeGroups API in the Group Coordinator. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14504) Implement DescribeGroups API
[ https://issues.apache.org/jira/browse/KAFKA-14504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17767920#comment-17767920 ] Sagar Rao commented on KAFKA-14504: --- [~dajac], is it still too urgent? I can take a stab at it if it's not already being worked upon. > Implement DescribeGroups API > > > Key: KAFKA-14504 > URL: https://issues.apache.org/jira/browse/KAFKA-14504 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Priority: Major > > Implement DescribeGroups API in the Group Coordinator. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15460) Add group type filter to ListGroups API
[ https://issues.apache.org/jira/browse/KAFKA-15460?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17767900#comment-17767900 ] Sagar Rao commented on KAFKA-15460: --- [~zhaohaidao]are you currently working on this? > Add group type filter to ListGroups API > --- > > Key: KAFKA-15460 > URL: https://issues.apache.org/jira/browse/KAFKA-15460 > Project: Kafka > Issue Type: Sub-task >Reporter: David Jacot >Assignee: HaiyuanZhao >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15473) Connect connector-plugins endpoint shows duplicate plugins
[ https://issues.apache.org/jira/browse/KAFKA-15473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17766738#comment-17766738 ] Sagar Rao edited comment on KAFKA-15473 at 9/19/23 10:33 AM: - [~satish.duggana], No the API documentation doesn't mention anything about the presence/absence of duplicate entries. This is what it says: {noformat} GET /connector-plugins- return a list of connector plugins installed in the Kafka Connect cluster. Note that the API only checks for connectors on the worker that handles the request, which means you may see inconsistent results, especially during a rolling upgrade if you add new connector jars {noformat} I think the implicit assumption is that these would always return unique values but as Greg pointed out above, even pre-3.6 there could be cases in which this end point can return duplicate entries. Keeping that in mind, IMO this needn't be a release blocker and we can document it as you suggested. was (Author: sagarrao): [~satish.duggana], No the API documentation doesn't mention anything about the presence/absence of duplicate entries. This is what it says: ??GET /connector-plugins- return a list of connector plugins installed in the Kafka Connect cluster. Note that the API only checks for connectors on the worker that handles the request, which means you may see inconsistent results, especially during a rolling upgrade if you add new connector jars ?? I think the implicit assumption is that these would always return unique values but as Greg pointed out above, even pre-3.6 there could be cases in which this end point can return duplicate entries. Keeping that in mind, IMO this needn't be a release blocker and we can document it as you suggested. > Connect connector-plugins endpoint shows duplicate plugins > -- > > Key: KAFKA-15473 > URL: https://issues.apache.org/jira/browse/KAFKA-15473 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.6.0 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Major > Fix For: 3.6.0 > > > In <3.6.0-rc0, only one copy of each plugin would be shown. For example: > {noformat} > { > "class": "org.apache.kafka.connect.storage.StringConverter", > "type": "converter" > },{noformat} > In 3.6.0-rc0, there are multiple listings for the same plugin. For example: > > {noformat} > { > "class": "org.apache.kafka.connect.storage.StringConverter", > "type": "converter" > }, > { > "class": "org.apache.kafka.connect.storage.StringConverter", > "type": "converter" > }, > { > "class": "org.apache.kafka.connect.storage.StringConverter", > "type": "converter" > }, > { > "class": "org.apache.kafka.connect.storage.StringConverter", > "type": "converter" > }, > { > "class": "org.apache.kafka.connect.storage.StringConverter", > "type": "converter" > }, > { > "class": "org.apache.kafka.connect.storage.StringConverter", > "type": "converter", > "version": "3.6.0" > },{noformat} > These duplicates appear to happen when a plugin with the same class name > appears in multiple locations/classloaders. > When interpreting a connector configuration, only one of these plugins will > be chosen, so only one is relevant to show to users. The REST API should only > display the plugins which are eligible to be loaded, and hide the duplicates. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15473) Connect connector-plugins endpoint shows duplicate plugins
[ https://issues.apache.org/jira/browse/KAFKA-15473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17766738#comment-17766738 ] Sagar Rao commented on KAFKA-15473: --- [~satish.duggana], No the API documentation doesn't mention anything about the presence/absence of duplicate entries. This is what it says: ??GET /connector-plugins- return a list of connector plugins installed in the Kafka Connect cluster. Note that the API only checks for connectors on the worker that handles the request, which means you may see inconsistent results, especially during a rolling upgrade if you add new connector jars ?? I think the implicit assumption is that these would always return unique values but as Greg pointed out above, even pre-3.6 there could be cases in which this end point can return duplicate entries. Keeping that in mind, IMO this needn't be a release blocker and we can document it as you suggested. > Connect connector-plugins endpoint shows duplicate plugins > -- > > Key: KAFKA-15473 > URL: https://issues.apache.org/jira/browse/KAFKA-15473 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.6.0 >Reporter: Greg Harris >Assignee: Greg Harris >Priority: Major > Fix For: 3.6.0 > > > In <3.6.0-rc0, only one copy of each plugin would be shown. For example: > {noformat} > { > "class": "org.apache.kafka.connect.storage.StringConverter", > "type": "converter" > },{noformat} > In 3.6.0-rc0, there are multiple listings for the same plugin. For example: > > {noformat} > { > "class": "org.apache.kafka.connect.storage.StringConverter", > "type": "converter" > }, > { > "class": "org.apache.kafka.connect.storage.StringConverter", > "type": "converter" > }, > { > "class": "org.apache.kafka.connect.storage.StringConverter", > "type": "converter" > }, > { > "class": "org.apache.kafka.connect.storage.StringConverter", > "type": "converter" > }, > { > "class": "org.apache.kafka.connect.storage.StringConverter", > "type": "converter" > }, > { > "class": "org.apache.kafka.connect.storage.StringConverter", > "type": "converter", > "version": "3.6.0" > },{noformat} > These duplicates appear to happen when a plugin with the same class name > appears in multiple locations/classloaders. > When interpreting a connector configuration, only one of these plugins will > be chosen, so only one is relevant to show to users. The REST API should only > display the plugins which are eligible to be loaded, and hide the duplicates. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] (KAFKA-14401) Connector/Tasks reading offsets can get stuck if underneath WorkThread dies
[ https://issues.apache.org/jira/browse/KAFKA-14401 ] Sagar Rao deleted comment on KAFKA-14401: --- was (Author: sagarrao): There are a couple of routes that can be taken here: 1) As discussed, we try to reinitiate the WorkThread in case the work thread dies due to the unhandled exception. This could be useful to have the offsets read making progress and the connectors don't realise it. Problem is if it's a fatal exception, we might still re-create the thread. This could be both a good thing and a bad thing tbh. Of-course we would still log the exception(already happening). 2) Extending the idea from pt#1, we can create a single threaded thread pool which would take care of recreating the thread if it dies. It has the same pros and cons as #1. 3) The other extreme could be to not accept any offsets request and fail the worker somehow. > Connector/Tasks reading offsets can get stuck if underneath WorkThread dies > --- > > Key: KAFKA-14401 > URL: https://issues.apache.org/jira/browse/KAFKA-14401 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > > When a connector or task tries to read the offsets from the offsets topic, it > issues `OffsetStorageImpl#offsets` method. This method gets a Future from the > underneath KafkaBackingStore. KafkaBackingStore invokes > `KafkaBasedLog#readToEnd` method and passes the Callback. This method > essentially adds the Callback to a Queue of callbacks that are being managed. > Within KafkaBasedLog, there's a WorkThread which keeps polling over the > callback queue and executes them and it does this in an infinite loop. > However, there is an enclosing try/catch block around the while loop. If > there's an exception thrown which is not caught by any of the other catch > blocks, the control goes to the outermost catch block and the WorkThread is > terminated. However, the connectors/tasks are not aware of this and they > would keep submitting callbacks to KafkaBasedLog with nobody processing them. > This can be seen in the thread dumps as well: > > {code:java} > "task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s > tid=0x7f8d9c037000 nid=0x5d00 waiting on condition [0x7f8dc08cd000] > java.lang.Thread.State: WAITING (parking) > at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method) > - parking to wait for <0x00070345c9a8> (a > java.util.concurrent.CountDownLatch$Sync) > at > java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345) > at > java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232) > at > org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98) > at > org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101) > at > org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63) > {code} > > We need a mechanism to fail all such offset read requests. That is because > even if we restart the thread, chances are it will still fail with the same > error so the offset fetch would be stuck perennially. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14401) Connector/Tasks reading offsets can get stuck if underneath WorkThread dies
[ https://issues.apache.org/jira/browse/KAFKA-14401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-14401: - Assignee: Sagar Rao (was: Chaitanya Mukka) > Connector/Tasks reading offsets can get stuck if underneath WorkThread dies > --- > > Key: KAFKA-14401 > URL: https://issues.apache.org/jira/browse/KAFKA-14401 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > > When a connector or task tries to read the offsets from the offsets topic, it > issues `OffsetStorageImpl#offsets` method. This method gets a Future from the > underneath KafkaBackingStore. KafkaBackingStore invokes > `KafkaBasedLog#readToEnd` method and passes the Callback. This method > essentially adds the Callback to a Queue of callbacks that are being managed. > Within KafkaBasedLog, there's a WorkThread which keeps polling over the > callback queue and executes them and it does this in an infinite loop. > However, there is an enclosing try/catch block around the while loop. If > there's an exception thrown which is not caught by any of the other catch > blocks, the control goes to the outermost catch block and the WorkThread is > terminated. However, the connectors/tasks are not aware of this and they > would keep submitting callbacks to KafkaBasedLog with nobody processing them. > This can be seen in the thread dumps as well: > > {code:java} > "task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s > tid=0x7f8d9c037000 nid=0x5d00 waiting on condition [0x7f8dc08cd000] > java.lang.Thread.State: WAITING (parking) > at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method) > - parking to wait for <0x00070345c9a8> (a > java.util.concurrent.CountDownLatch$Sync) > at > java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345) > at > java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232) > at > org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98) > at > org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101) > at > org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63) > {code} > > We need a mechanism to fail all such offset read requests. That is because > even if we restart the thread, chances are it will still fail with the same > error so the offset fetch would be stuck perennially. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14401) Connector/Tasks reading offsets can get stuck if underneath WorkThread dies
[ https://issues.apache.org/jira/browse/KAFKA-14401?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao updated KAFKA-14401: -- Description: When a connector or task tries to read the offsets from the offsets topic, it issues `OffsetStorageImpl#offsets` method. This method gets a Future from the underneath KafkaBackingStore. KafkaBackingStore invokes `KafkaBasedLog#readToEnd` method and passes the Callback. This method essentially adds the Callback to a Queue of callbacks that are being managed. Within KafkaBasedLog, there's a WorkThread which keeps polling over the callback queue and executes them and it does this in an infinite loop. However, there is an enclosing try/catch block around the while loop. If there's an exception thrown which is not caught by any of the other catch blocks, the control goes to the outermost catch block and the WorkThread is terminated. However, the connectors/tasks are not aware of this and they would keep submitting callbacks to KafkaBasedLog with nobody processing them. This can be seen in the thread dumps as well: {code:java} "task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s tid=0x7f8d9c037000 nid=0x5d00 waiting on condition [0x7f8dc08cd000] java.lang.Thread.State: WAITING (parking) at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method) - parking to wait for <0x00070345c9a8> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345) at java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232) at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98) at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101) at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63) {code} We need a mechanism to fail all such offset read requests. That is because even if we restart the thread, chances are it will still fail with the same error so the offset fetch would be stuck perennially. was: When a connector or task tries to read the offsets from the offsets topic, it issues `OffsetStorageImpl#offsets` method. This method gets a Future from the underneath KafkaBackingStore. KafkaBackingStore invokes `KafkaBasedLog#readToEnd` method and passes the Callback. This method essentially adds the Callback to a Queue of callbacks that are being managed. Within KafkaBasedLog, there's a WorkThread which keeps polling over the callback queue and executes them and it does this in an infinite loop. However, there is an enclosing try/catch block around the while loop. If there's an exception thrown which is not caught by any of the other catch blocks, the control goes to the outermost catch block and the WorkThread is terminated. However, the connectors/tasks are not aware of this and they would keep submitting callbacks to KafkaBasedLog with nobody processing them. This can be seen in the thread dumps as well: {code:java} "task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s tid=0x7f8d9c037000 nid=0x5d00 waiting on condition [0x7f8dc08cd000] java.lang.Thread.State: WAITING (parking) at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method) - parking to wait for <0x00070345c9a8> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345) at java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232) at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98) at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101) at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63) {code} We need a mechanism to restart the WorkThread if it dies.
[jira] [Commented] (KAFKA-15408) Restart failed tasks in Kafka Connect up to a configurable max-tries
[ https://issues.apache.org/jira/browse/KAFKA-15408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17759648#comment-17759648 ] Sagar Rao commented on KAFKA-15408: --- Hi, the steps are outline here: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals > Restart failed tasks in Kafka Connect up to a configurable max-tries > > > Key: KAFKA-15408 > URL: https://issues.apache.org/jira/browse/KAFKA-15408 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Patrick Pang >Priority: Major > Labels: needs-kip > > h2. Issue > Currently, Kafka Connect just reports failed tasks on REST API, with the > error. Users are expected to monitor the status and restart individual > connectors if there is transient errors. Unfortunately these are common for > database connectors, e.g. transient connection error, flip of DNS, database > downtime, etc. Kafka Connect silently failing due to these scenarios would > lead to stale data downstream. > h2. Proposal > Kafka Connect should be able to restart failed tasks automatically, up to a > configurable max-tries. > h2. Prior arts > * > [https://github.com/strimzi/proposals/blob/main/007-restarting-kafka-connect-connectors-and-tasks.md] > > * > [https://docs.aiven.io/docs/products/kafka/kafka-connect/howto/enable-automatic-restart] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15408) Restart failed tasks in Kafka Connect up to a configurable max-tries
[ https://issues.apache.org/jira/browse/KAFKA-15408?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao updated KAFKA-15408: -- Labels: needs-kip (was: ) > Restart failed tasks in Kafka Connect up to a configurable max-tries > > > Key: KAFKA-15408 > URL: https://issues.apache.org/jira/browse/KAFKA-15408 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Patrick Pang >Priority: Major > Labels: needs-kip > > h2. Issue > Currently, Kafka Connect just reports failed tasks on REST API, with the > error. Users are expected to monitor the status and restart individual > connectors if there is transient errors. Unfortunately these are common for > database connectors, e.g. transient connection error, flip of DNS, database > downtime, etc. Kafka Connect silently failing due to these scenarios would > lead to stale data downstream. > h2. Proposal > Kafka Connect should be able to restart failed tasks automatically, up to a > configurable max-tries. > h2. Prior arts > * > [https://github.com/strimzi/proposals/blob/main/007-restarting-kafka-connect-connectors-and-tasks.md] > > * > [https://docs.aiven.io/docs/products/kafka/kafka-connect/howto/enable-automatic-restart] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15408) Restart failed tasks in Kafka Connect up to a configurable max-tries
[ https://issues.apache.org/jira/browse/KAFKA-15408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17759529#comment-17759529 ] Sagar Rao commented on KAFKA-15408: --- [~patrickpang], thanks for filing this ! IMO, this is a feature which is long overdue on the Connect framework. Do you plan to pick this one up? I ask because if the answer is yes, we would need a KIP for this considering we might change some of the behaviour on how the status end point responses might not reflected task failure as soon as a task fails. Also, the configurable max-tries means the addition of a new config possibly. > Restart failed tasks in Kafka Connect up to a configurable max-tries > > > Key: KAFKA-15408 > URL: https://issues.apache.org/jira/browse/KAFKA-15408 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Patrick Pang >Priority: Major > > h2. Issue > Currently, Kafka Connect just reports failed tasks on REST API, with the > error. Users are expected to monitor the status and restart individual > connectors if there is transient errors. Unfortunately these are common for > database connectors, e.g. transient connection error, flip of DNS, database > downtime, etc. Kafka Connect silently failing due to these scenarios would > lead to stale data downstream. > h2. Proposal > Kafka Connect should be able to restart failed tasks automatically, up to a > configurable max-tries. > h2. Prior arts > * > [https://github.com/strimzi/proposals/blob/main/007-restarting-kafka-connect-connectors-and-tasks.md] > > * > [https://docs.aiven.io/docs/products/kafka/kafka-connect/howto/enable-automatic-restart] > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14429) Move OffsetStorageReader from storage package to source package
[ https://issues.apache.org/jira/browse/KAFKA-14429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17756308#comment-17756308 ] Sagar Rao commented on KAFKA-14429: --- hey [~gharris1727], given Chris's comment above, do you think we can close this one ? > Move OffsetStorageReader from storage package to source package > --- > > Key: KAFKA-14429 > URL: https://issues.apache.org/jira/browse/KAFKA-14429 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Reporter: Greg Harris >Priority: Minor > Labels: needs-kip > > The OffsetStorageReader is an interface provided to source connectors. This > does not fit with the broader context of the `storage` package, which is > focused on sink/source-agnostic converters and serialization/deserialization. > The current interface should be deprecated and extend from the relocated > interface in a different package. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14138) The Exception Throwing Behavior of Transactional Producer is Inconsistent
[ https://issues.apache.org/jira/browse/KAFKA-14138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-14138: - Assignee: (was: Sagar Rao) > The Exception Throwing Behavior of Transactional Producer is Inconsistent > - > > Key: KAFKA-14138 > URL: https://issues.apache.org/jira/browse/KAFKA-14138 > Project: Kafka > Issue Type: Improvement > Components: producer >Reporter: Guozhang Wang >Priority: Critical > > There's an issue for inconsistent error throwing inside Kafka Producer when > transactions are enabled. In short, there are two places where the received > error code from the brokers would be eventually thrown to the caller: > * Recorded on the batch's metadata, via "Sender#failBatch" > * Recorded on the txn manager, via "txnManager#handleFailedBatch". > The former would be thrown from 1) the `Future` returned from > the `send`; or 2) the `callback` inside `send(record, callback)`. Whereas, > the latter would be thrown from `producer.send()` directly in which we call > `txnManager.maybeAddPartition -> maybeFailWithError`. However, when thrown > from the former, it's not wrapped hence the direct exception (e.g. > ClusterAuthorizationException), whereas in the latter it's wrapped as, e.g. > KafkaException(ClusterAuthorizationException). And which one would be thrown > depend on a race condition since we cannot control by the time the caller > thread calls `txnManager.maybeAddPartition`, if the previous produceRequest's > error has been sent back or not. > For example consider the following sequence for idempotent producer: > 1. caller thread: within future = producer.send(), call > recordAccumulator.append > 2. sender thread: drain the accumulator, send the produceRequest and get the > error back. > 3. caller thread: within future = producer.send(), call > txnManager.maybeAddPartition, in which we would check `maybeFailWithError` > before `isTransactional`. > 4. caller thread: future.get() > In a sequence where then 3) happened before 2), we would only get the raw > exception at step 4; in a sequence where 2) happened before 3), then we would > throw the exception immediately at 3). > This inconsistent error throwing is pretty annoying for users since they'd > need to handle both cases, but many of them actually do not know this > trickiness. We should make the error throwing consistent, e.g. we should > consider: 1) which errors would be thrown from callback / future.get, and > which would be thrown from the `send` call directly, and these errors should > better be non-overlapping, 2) whether we should wrap the raw error or not, we > should do so consistently. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-15339) Transient I/O error happening in appending records could lead to the halt of whole cluster
[ https://issues.apache.org/jira/browse/KAFKA-15339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17755812#comment-17755812 ] Sagar Rao edited comment on KAFKA-15339 at 8/18/23 5:52 AM: Thanks for reporting this bug [~functioner]. As you correctly pointed out, there seems to be no retry mechanism while trying to append files to the topic which is backed by a file. From what I can see, whenever it fails, `LocalLog#maybeHandleIOException` is invoked which marks the given directory as offline which I believe triggers the rest of the errors that you see on the Connect cluster. While the suggestion you provided is a valid one i.e to retry in such cases, I couldn't find that pattern being used anywhere in this part of the code. Having said that, have you noticed this error being thrown for versions < 3.5? The reason I ask that is that in v3.5, a migration of this class to the storage module was done. Here's the [link|https://github.com/apache/kafka/pull/13041] of the PR. AFAICS, there is not much difference and we should have experienced the same behaviour but was just curious. LMK. Or else, we can look at adding the retry mechanism but I am not sure how big/small that change would be considering these classes like UnifiedLog etc seem to be on a critical path. was (Author: sagarrao): Thanks for reporting this bug [~functioner]. As you correctly pointed out, there seems to be no retry mechanism while trying to append files to the topic which is backed by a file. From what I can see, whenever it fails, `LocalLog#maybeHandleIOException` is invoked which marks the given directory as offline which I believe triggers the rest of the errors that you see on the Connect cluster. While the suggestion you provided is a valid one i.e to retry in such cases, I couldn't find that pattern being used anywhere in this part of the code. Having said that, have you noticed this error being thrown for versions < 3.5? The reason I ask that is that in v3.5, a migration of this class to the storage module was done. Here's the [link|https://github.com/apache/kafka/pull/13041] of the PR. AFAICS, there is not much difference and we should have experienced the same behaviour but was just curious. LMK. Or else, we can look at adding the retry mechanism > Transient I/O error happening in appending records could lead to the halt of > whole cluster > -- > > Key: KAFKA-15339 > URL: https://issues.apache.org/jira/browse/KAFKA-15339 > Project: Kafka > Issue Type: Improvement > Components: connect, producer >Affects Versions: 3.5.0 >Reporter: Haoze Wu >Priority: Major > > We are running an integration test in which we start an Embedded Connect > Cluster in the active 3.5 branch. However, because of transient disk error, > we may encounter an IOException during appending records to one topic. As > shown in the stack trace: > {code:java} > [2023-08-13 16:53:51,016] ERROR Error while appending records to > connect-config-topic-connect-cluster-0 in dir > /tmp/EmbeddedKafkaCluster8003464883598783225 > (org.apache.kafka.storage.internals.log.LogDirFailureChannel:61) > java.io.IOException: > at > org.apache.kafka.common.record.MemoryRecords.writeFullyTo(MemoryRecords.java:92) > at > org.apache.kafka.common.record.FileRecords.append(FileRecords.java:188) > at kafka.log.LogSegment.append(LogSegment.scala:161) > at kafka.log.LocalLog.append(LocalLog.scala:436) > at kafka.log.UnifiedLog.append(UnifiedLog.scala:853) > at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:664) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1281) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1269) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:977) > at > scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28) > at > scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27) > at scala.collection.mutable.HashMap.map(HashMap.scala:35) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:965) > at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:623) > at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:680) > at kafka.server.KafkaApis.handle(KafkaApis.scala:180) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76) > at java.lang.Thread.run(Thread.java:748) {code} > However, just because of failing to append the records to one partition. The > fetcher for all the other partitions are removed, broker shutdown, and > finally
[jira] [Commented] (KAFKA-15339) Transient I/O error happening in appending records could lead to the halt of whole cluster
[ https://issues.apache.org/jira/browse/KAFKA-15339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17755812#comment-17755812 ] Sagar Rao commented on KAFKA-15339: --- Thanks for reporting this bug [~functioner]. As you correctly pointed out, there seems to be no retry mechanism while trying to append files to the topic which is backed by a file. From what I can see, whenever it fails, `LocalLog#maybeHandleIOException` is invoked which marks the given directory as offline which I believe triggers the rest of the errors that you see on the Connect cluster. While the suggestion you provided is a valid one i.e to retry in such cases, I couldn't find that pattern being used anywhere in this part of the code. Having said that, have you noticed this error being thrown for versions < 3.5? The reason I ask that is that in v3.5, a migration of this class to the storage module was done. Here's the [link|https://github.com/apache/kafka/pull/13041] of the PR. AFAICS, there is not much difference and we should have experienced the same behaviour but was just curious. LMK. Or else, we can look at adding the retry mechanism > Transient I/O error happening in appending records could lead to the halt of > whole cluster > -- > > Key: KAFKA-15339 > URL: https://issues.apache.org/jira/browse/KAFKA-15339 > Project: Kafka > Issue Type: Improvement > Components: connect, producer >Affects Versions: 3.5.0 >Reporter: Haoze Wu >Priority: Major > > We are running an integration test in which we start an Embedded Connect > Cluster in the active 3.5 branch. However, because of transient disk error, > we may encounter an IOException during appending records to one topic. As > shown in the stack trace: > {code:java} > [2023-08-13 16:53:51,016] ERROR Error while appending records to > connect-config-topic-connect-cluster-0 in dir > /tmp/EmbeddedKafkaCluster8003464883598783225 > (org.apache.kafka.storage.internals.log.LogDirFailureChannel:61) > java.io.IOException: > at > org.apache.kafka.common.record.MemoryRecords.writeFullyTo(MemoryRecords.java:92) > at > org.apache.kafka.common.record.FileRecords.append(FileRecords.java:188) > at kafka.log.LogSegment.append(LogSegment.scala:161) > at kafka.log.LocalLog.append(LocalLog.scala:436) > at kafka.log.UnifiedLog.append(UnifiedLog.scala:853) > at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:664) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1281) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1269) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:977) > at > scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28) > at > scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27) > at scala.collection.mutable.HashMap.map(HashMap.scala:35) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:965) > at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:623) > at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:680) > at kafka.server.KafkaApis.handle(KafkaApis.scala:180) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76) > at java.lang.Thread.run(Thread.java:748) {code} > However, just because of failing to append the records to one partition. The > fetcher for all the other partitions are removed, broker shutdown, and > finally embedded connect cluster killed as whole. > {code:java} > [2023-08-13 17:35:37,966] WARN Stopping serving logs in dir > /tmp/EmbeddedKafkaCluster6777164631574762227 (kafka.log.LogManager:70) > [2023-08-13 17:35:37,968] ERROR Shutdown broker because all log dirs in > /tmp/EmbeddedKafkaCluster6777164631574762227 have failed > (kafka.log.LogManager:143) > [2023-08-13 17:35:37,968] WARN Abrupt service halt with code 1 and message > null (org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster:130) > [2023-08-13 17:35:37,968] ERROR [LogDirFailureHandler]: Error due to > (kafka.server.ReplicaManager$LogDirFailureHandler:135) > org.apache.kafka.connect.util.clusters.UngracefulShutdownException: Abrupt > service halt with code 1 and message null {code} > I am wondering if we could add configurable retry around the root cause to > tolerate the possible I/O faults so that if the retry is successful, the > embedded connect cluster could still operate. > Any comments and suggestions would be appreciated. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15354) Partition leader is not evenly distributed in kraft mode
[ https://issues.apache.org/jira/browse/KAFKA-15354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17755805#comment-17755805 ] Sagar Rao commented on KAFKA-15354: --- [~dengziming], I took a look at this. I believe this is happening because when we are trying to find the first replica of a new partition, [here|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java#L362], we set the index back to 0 when the epochs don't match [here|https://github.com/apache/kafka/blob/trunk/metadata/src/main/java/org/apache/kafka/metadata/placement/StripedReplicaPlacer.java#L190]. In the test case you supplied, when we are adding partition 2, the epoch known to the brokers in rack 1 is 1 but the new incoming epoch is 2. So, the index is reset back to 0. I think that's why in this round as well we see broker 1 being assigned the leader. WDYT? > Partition leader is not evenly distributed in kraft mode > > > Key: KAFKA-15354 > URL: https://issues.apache.org/jira/browse/KAFKA-15354 > Project: Kafka > Issue Type: Bug >Reporter: Deng Ziming >Priority: Major > > In StripedReplicaPlacerTest, we can create a test below to reproduce this bug. > {code:java} > // code placeholder > @Test > public void testReplicaDistribution() { > MockRandom random = new MockRandom(); > StripedReplicaPlacer placer = new StripedReplicaPlacer(random); > TopicAssignment assignment = place(placer, 0, 4, (short) 2, Arrays.asList( > new UsableBroker(0, Optional.of("0"), false), > new UsableBroker(1, Optional.of("0"), false), > new UsableBroker(2, Optional.of("1"), false), > new UsableBroker(3, Optional.of("1"), false))); > System.out.println(assignment); > } {code} > In StripedReplicaPlacer, we only ensure leader are distributed evenly across > racks, but we didn't ensure leader are evenly distributed across nodes. in > the test above, we have 4 node: 1 2 3 4, and create 4 partitions but the > leaders are 1 2 1 2. while in zk mode, this is ensured, see > https://cwiki.apache.org/confluence/display/KAFKA/KIP-36+Rack+aware+replica+assignment -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15334) DescribeQuorum should not be seen as cluster action
[ https://issues.apache.org/jira/browse/KAFKA-15334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17753575#comment-17753575 ] Sagar Rao commented on KAFKA-15334: --- hey [~dengziming] if it isn't too critical, can I take a stab at it? I am new to this part of the code but when I took a quick glance, it appears to me that we need to do something similar to what is being done currently for `handleElectLeaders` within ControllerApis. > DescribeQuorum should not be seen as cluster action > > > Key: KAFKA-15334 > URL: https://issues.apache.org/jira/browse/KAFKA-15334 > Project: Kafka > Issue Type: Bug >Reporter: Deng Ziming >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15296) Allow committing offsets for Dropped records via SMTs
Sagar Rao created KAFKA-15296: - Summary: Allow committing offsets for Dropped records via SMTs Key: KAFKA-15296 URL: https://issues.apache.org/jira/browse/KAFKA-15296 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Sagar Rao Assignee: Sagar Rao Currently the connect Runtime doesn't commit the offsets of records which have been dropped due to SMT. This can lead to issues if the dropped record's partition reflects a source partition and the connector depends upon the committed offsets to make progress. In such cases, the connector might just stall. We should enable committing offsets for dropped records as well. Note that today if a record is dropped because exactly-once support is enabled and the connector chose to abort the batch containing the record, then its offset is still committed. So there already exists a discrepancy in the way the runtime treats these dropped records. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15229) Increase default value of task.shutdown.graceful.timeout.ms
[ https://issues.apache.org/jira/browse/KAFKA-15229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao updated KAFKA-15229: -- Description: The Kafka Connect config [task.shutdown.graceful.timeout.ms. |https://kafka.apache.org/documentation/#connectconfigs_task.shutdown.graceful.timeout.ms]has a default value of 5s. As per it's definition: {noformat} Amount of time to wait for tasks to shutdown gracefully. This is the total amount of time, not per task. All task have shutdown triggered, then they are waited on sequentially.{noformat} it is the total timeout for all tasks to shutdown. Also, if multiple tasks are to be shutdown then, they are waited upon sequentially. Now the default value of this config is ok for smaller clusters with less number of tasks, on a larger cluster because the timeout can elapse we will see a lot of messages of the form {noformat} Graceful stop of task failed. {noformat} In case of failure in graceful stop of tasks, the tasks are cancelled which means that they won't send out a status update. Once that happens there won't be any `UNASSIGNED` status message posted for that task. Let's say the task stop was triggered by a worker going down. If the cluster is configured to use Incremental Cooperative Assignor, then the task wouldn't be reassigned until scheduled.rebalance.delay.max.ms interval elapses. So, for that amount of duration, the task would show up with status RUNNING whenever it's status is queried for. This can be confusing for the users. This problem can be exacerbated on cloud environments(like kubernetes pods) because there is a high chance that the running status would be associated with an older worker_id which doesn't even exist in the cluster anymore. While the net effect of all of this is not catastrophic i.e it won't lead to any processing delays or loss of data but the status of the task would be off. And if there are fast rebalances happening under Incremental Cooperative Assignor, then that duration could be high as well. So, the proposal is to increase the default value to a higher value. I am thinking we can set it to 60s because as far as I can see, it doesn't interfere with any other timeout that we have. Also while users can set this config on their clusters, this is a low importance config and generally goes unnoticed. So I believe we should look to increase the default value specially for future users of the clusters who might not look at this config in detail. was: The Kafka Connect config [task.shutdown.graceful.timeout.ms. |https://kafka.apache.org/documentation/#connectconfigs_task.shutdown.graceful.timeout.ms]has a default value of 5s. As per it's definition: {noformat} Amount of time to wait for tasks to shutdown gracefully. This is the total amount of time, not per task. All task have shutdown triggered, then they are waited on sequentially.{noformat} it is the total timeout for all tasks to shutdown. Also, if multiple tasks are to be shutdown then, they are waited upon sequentially. Now the default value of this config is ok for smaller clusters with less number of tasks, on a larger cluster because the timeout can elapse we will see a lot of messages of the form {noformat} Graceful stop of task failed. {noformat} In case of failure in graceful stop of tasks, the tasks are cancelled which means that they won't send out a status update. Once that happens there won't be any `UNASSIGNED` status message posted for that task. Let's say the task stop was triggered by a worker going down. If the cluster is configured to use Incremental Cooperative Assignor, then the task wouldn't be reassigned until scheduled.rebalance.delay.max.ms interval elapses. So, for that amount of duration, the task would show up with status RUNNING whenever it's status is queried for. This can be confusing for the users. This problem can be exacerbated on cloud environments(like kubernetes pods) because there is a high chance that the running status would be associated with an older worker_id which doesn't even exist in the cluster anymore. While the net effect of all of this is not catastrophic i.e it won't lead to any processing delays or loss of data but the status of the task would be off. And if there are fast rebalances happening under Incremental Cooperative Assignor, then that duration could be high as well. So, the proposal is to increase the default value to a higher value. I am thinking we can set it to 60s because as far as I can see, it doesn't interfere with any other timeout that we have. Also while users can set this config on their clusters, this is a low importance config and generally goes unnoticed. So I believe we should look to increase the default value. > Increase default value of task.shutdown.graceful.timeout.ms > --- > >
[jira] [Updated] (KAFKA-15229) Increase default value of task.shutdown.graceful.timeout.ms
[ https://issues.apache.org/jira/browse/KAFKA-15229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao updated KAFKA-15229: -- Description: The Kafka Connect config [task.shutdown.graceful.timeout.ms. |https://kafka.apache.org/documentation/#connectconfigs_task.shutdown.graceful.timeout.ms]has a default value of 5s. As per it's definition: {noformat} Amount of time to wait for tasks to shutdown gracefully. This is the total amount of time, not per task. All task have shutdown triggered, then they are waited on sequentially.{noformat} it is the total timeout for all tasks to shutdown. Also, if multiple tasks are to be shutdown then, they are waited upon sequentially. Now the default value of this config is ok for smaller clusters with less number of tasks, on a larger cluster because the timeout can elapse we will see a lot of messages of the form {noformat} Graceful stop of task failed. {noformat} In case of failure in graceful stop of tasks, the tasks are cancelled which means that they won't send out a status update. Once that happens there won't be any `UNASSIGNED` status message posted for that task. Let's say the task stop was triggered by a worker going down. If the cluster is configured to use Incremental Cooperative Assignor, then the task wouldn't be reassigned until scheduled.rebalance.delay.max.ms interval elapses. So, for that amount of duration, the task would show up with status RUNNING whenever it's status is queried for. This can be confusing for the users. This problem can be exacerbated on cloud environments(like kubernetes pods) because there is a high chance that the running status would be associated with an older worker_id which doesn't even exist in the cluster anymore. While the net effect of all of this is not catastrophic i.e it won't lead to any processing delays or loss of data but the status of the task would be off. And if there are fast rebalances happening under Incremental Cooperative Assignor, then that duration could be high as well. So, the proposal is to increase the default value to a higher value. I am thinking we can set it to 60s because as far as I can see, it doesn't interfere with any other timeout that we have. Also while users can set this config on their clusters, this is a low importance config and generally goes unnoticed. So I believe we should look to increase the default value. was: The Kafka Connect config [task.shutdown.graceful.timeout.ms. |https://kafka.apache.org/documentation/#connectconfigs_task.shutdown.graceful.timeout.ms]has a default value of 5s. As per it's definition: {noformat} Amount of time to wait for tasks to shutdown gracefully. This is the total amount of time, not per task. All task have shutdown triggered, then they are waited on sequentially.{noformat} it is the total timeout for all tasks to shutdown. Also, if multiple tasks are to be shutdown then, they are waited upon sequentially. Now the default value of this config is ok for smaller clusters with less number of tasks, on a larger cluster because the timeout can elapse we will see a lot of messages of the form {noformat} Graceful stop of task failed. {noformat} In case of failure in graceful stop of tasks, the tasks are cancelled which means that they won't send out a status update. Once that happens there won't be any `UNASSIGNED` status message posted for that task. Let's say the task stop was triggered by a worker going down. If the cluster is configured to use Incremental Cooperative Assignor, then the task wouldn't be reassigned until scheduled.rebalance.delay.max.ms interval elapses. So, for that amount of duration, the task would show up with status RUNNING whenever it's status is queried for. This can be confusing for the users. This problem can be exacerbated on cloud environments(like kubernetes pods) because there is a high chance that the running status would be associated with an older worker_id which doesn't even exist in the cluster anymore. While the net effect of all of this is not catastrophic i.e it won't lead to any processing delays or loss of data but the status of the task would be off. And if there are fast rebalances happening under Incremental Cooperative Assignor, then that duration could be high as well. So, the proposal is to increase the default value to a higher value. I am thinking we can set it to 60s because as far as I can see, it doesn't interfere with any other timeout that we have. Also while users can set this config on their clusters, this is a low importance config and generally goes unnoticed. So I believe we should look to increase the default value. I am tagging this as need-kip because I believe we will need one. > Increase default value of task.shutdown.graceful.timeout.ms > --- > > Key:
[jira] [Updated] (KAFKA-15229) Increase default value of task.shutdown.graceful.timeout.ms
[ https://issues.apache.org/jira/browse/KAFKA-15229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao updated KAFKA-15229: -- Labels: (was: needs-kip) > Increase default value of task.shutdown.graceful.timeout.ms > --- > > Key: KAFKA-15229 > URL: https://issues.apache.org/jira/browse/KAFKA-15229 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > > The Kafka Connect config [task.shutdown.graceful.timeout.ms. > |https://kafka.apache.org/documentation/#connectconfigs_task.shutdown.graceful.timeout.ms]has > a default value of 5s. As per it's definition: > > {noformat} > Amount of time to wait for tasks to shutdown gracefully. This is the total > amount of time, not per task. All task have shutdown triggered, then they are > waited on sequentially.{noformat} > it is the total timeout for all tasks to shutdown. Also, if multiple tasks > are to be shutdown then, they are waited upon sequentially. Now the default > value of this config is ok for smaller clusters with less number of tasks, on > a larger cluster because the timeout can elapse we will see a lot of messages > of the form > {noformat} > Graceful stop of task failed. > {noformat} > In case of failure in graceful stop of tasks, the tasks are cancelled which > means that they won't send out a status update. Once that happens there won't > be any `UNASSIGNED` status message posted for that task. Let's say the task > stop was triggered by a worker going down. If the cluster is configured to > use Incremental Cooperative Assignor, then the task wouldn't be reassigned > until scheduled.rebalance.delay.max.ms interval elapses. So, for that amount > of duration, the task would show up with status RUNNING whenever it's status > is queried for. This can be confusing for the users. > This problem can be exacerbated on cloud environments(like kubernetes pods) > because there is a high chance that the running status would be associated > with an older worker_id which doesn't even exist in the cluster anymore. > While the net effect of all of this is not catastrophic i.e it won't lead to > any processing delays or loss of data but the status of the task would be > off. And if there are fast rebalances happening under Incremental Cooperative > Assignor, then that duration could be high as well. > So, the proposal is to increase the default value to a higher value. I am > thinking we can set it to 60s because as far as I can see, it doesn't > interfere with any other timeout that we have. > Also while users can set this config on their clusters, this is a low > importance config and generally goes unnoticed. So I believe we should look > to increase the default value. > I am tagging this as need-kip because I believe we will need one. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-10457) JsonConverter.toConnectData trims BigInteger to Long for schema-less case
[ https://issues.apache.org/jira/browse/KAFKA-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746881#comment-17746881 ] Sagar Rao commented on KAFKA-10457: --- Thanks Yash. Yeah probably due to the workaround it hasn't received enough traction. Do you think we should wait for some other users to report similar errors (it's been around for a while though) or we should go ahead and add a new datatype? I see no harm in adding a new datatype though. > JsonConverter.toConnectData trims BigInteger to Long for schema-less case > - > > Key: KAFKA-10457 > URL: https://issues.apache.org/jira/browse/KAFKA-10457 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Oleksandr Diachenko >Assignee: Oleksandr Diachenko >Priority: Critical > > > When _JsonConverter_ is configured with _schemas.enable=false_ and value, > exceeding _Double_ is passed, the result is incorrect since the converter > trims it to _Double:_ > {code:java} > Map props = Collections.singletonMap("schemas.enable", > false); > converter.configure(props, true); > BigInteger value = BigInteger.valueOf(Long.MAX_VALUE).add(new > BigInteger("1")); > String msg = value.toString(); > SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, > msg.getBytes()); > assertNull(schemaAndValue.schema()); > assertEquals(value, schemaAndValue.value()); > {code} > > Fails with: > > {code:java} > expected:<9223372036854775808> but was:<-9223372036854775808> > Expected :9223372036854775808 > Actual :-9223372036854775808 > {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15005) Status of KafkaConnect task not correct
[ https://issues.apache.org/jira/browse/KAFKA-15005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746774#comment-17746774 ] Sagar Rao commented on KAFKA-15005: --- [~LucentWong], the PR for https://issues.apache.org/jira/browse/KAFKA-12525 has been merged. Closing this one. Thanks! > Status of KafkaConnect task not correct > --- > > Key: KAFKA-15005 > URL: https://issues.apache.org/jira/browse/KAFKA-15005 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.5.1, 3.0.0, 3.3.2 >Reporter: Yu Wang >Priority: Major > > Our MM2 is running version 2.5.1. > After a rebalance of our MM2 source tasks, we found there were several tasks > always in *UNASSIGNED* status, even the real tasks already started. > So we dump the payload of the status topic of Kafka Connect, and found the > last two status change is status *RUNNING* followed by status > {*}UNASSIGNED{*}. > {code:java} > LogAppendTime: 1684167885961 keysize: 64 valuesize: 95 sequence: -1 > headerKeys: [] key: task-7 payload: > {"state":"RUNNING","trace":null,"worker_id":"worker-1","generation":437643} > LogAppendTime: 1684167903456 keysize: 64 valuesize: 98 sequence: -1 > headerKeys: [] key: task-7 payload: > {"state":"UNASSIGNED","trace":null,"worker_id":"worker-2","generation":437643} > {code} > But usually, the RUNNING status should be appended after the UNASSIGNED, > because the worker coordinator will revoked the tasks before start new tasks. > Then we checked the log of our MM2 worker. And found that, during that time, > there was a task that revoked on worker-2 and started on worker-1. > > Worker-1 > {code:java} > [2023-05-15 09:24:45,950] INFO [Worker clientId=connect-1, > groupId=__group] Starting task task-7 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > [2023-05-15 09:24:45,951] INFO Creating task task-7 > (org.apache.kafka.connect.runtime.Worker) {code} > Worker-2 > {code:java} > [2023-05-15 09:24:40,922] INFO Stopping task task-7 > (org.apache.kafka.connect.runtime.Worker) {code} > > So I think the incorrect status was caused by the revoked task finished later > than the new started task, which made the UNASSIGNED status append to that > status topic after the RUNNING status. > > After reading the code of DistributeHerder, I found that the task revoking is > running in a thread pool, the revoke operation just return after submit all > the callables. So I think even in the same worker, there is not a guarantee > that the revoke operation will always finish before the new tasks start. > {code:java} > for (final ConnectorTaskId taskId : tasks) { > callables.add(getTaskStoppingCallable(taskId)); > } > // The actual timeout for graceful task/connector stop is applied in worker's > // stopAndAwaitTask/stopAndAwaitConnector methods. > startAndStop(callables); > log.info("Finished stopping tasks in preparation for rebalance"); {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15005) Status of KafkaConnect task not correct
[ https://issues.apache.org/jira/browse/KAFKA-15005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao resolved KAFKA-15005. --- Resolution: Duplicate > Status of KafkaConnect task not correct > --- > > Key: KAFKA-15005 > URL: https://issues.apache.org/jira/browse/KAFKA-15005 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.5.1, 3.0.0, 3.3.2 >Reporter: Yu Wang >Priority: Major > > Our MM2 is running version 2.5.1. > After a rebalance of our MM2 source tasks, we found there were several tasks > always in *UNASSIGNED* status, even the real tasks already started. > So we dump the payload of the status topic of Kafka Connect, and found the > last two status change is status *RUNNING* followed by status > {*}UNASSIGNED{*}. > {code:java} > LogAppendTime: 1684167885961 keysize: 64 valuesize: 95 sequence: -1 > headerKeys: [] key: task-7 payload: > {"state":"RUNNING","trace":null,"worker_id":"worker-1","generation":437643} > LogAppendTime: 1684167903456 keysize: 64 valuesize: 98 sequence: -1 > headerKeys: [] key: task-7 payload: > {"state":"UNASSIGNED","trace":null,"worker_id":"worker-2","generation":437643} > {code} > But usually, the RUNNING status should be appended after the UNASSIGNED, > because the worker coordinator will revoked the tasks before start new tasks. > Then we checked the log of our MM2 worker. And found that, during that time, > there was a task that revoked on worker-2 and started on worker-1. > > Worker-1 > {code:java} > [2023-05-15 09:24:45,950] INFO [Worker clientId=connect-1, > groupId=__group] Starting task task-7 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > [2023-05-15 09:24:45,951] INFO Creating task task-7 > (org.apache.kafka.connect.runtime.Worker) {code} > Worker-2 > {code:java} > [2023-05-15 09:24:40,922] INFO Stopping task task-7 > (org.apache.kafka.connect.runtime.Worker) {code} > > So I think the incorrect status was caused by the revoked task finished later > than the new started task, which made the UNASSIGNED status append to that > status topic after the RUNNING status. > > After reading the code of DistributeHerder, I found that the task revoking is > running in a thread pool, the revoke operation just return after submit all > the callables. So I think even in the same worker, there is not a guarantee > that the revoke operation will always finish before the new tasks start. > {code:java} > for (final ConnectorTaskId taskId : tasks) { > callables.add(getTaskStoppingCallable(taskId)); > } > // The actual timeout for graceful task/connector stop is applied in worker's > // stopAndAwaitTask/stopAndAwaitConnector methods. > startAndStop(callables); > log.info("Finished stopping tasks in preparation for rebalance"); {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15239) producerPerformance system test for old client failed after v3.5.0
[ https://issues.apache.org/jira/browse/KAFKA-15239?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746253#comment-17746253 ] Sagar Rao commented on KAFKA-15239: --- [~showuon], I beleive we have this tool migration guideline : https://cwiki.apache.org/confluence/display/KAFKA/KIP-906%3A+Tools+migration+guidelines. I also think some of the tool migration efforts are following these. Federico might have more inputs on this. > producerPerformance system test for old client failed after v3.5.0 > -- > > Key: KAFKA-15239 > URL: https://issues.apache.org/jira/browse/KAFKA-15239 > Project: Kafka > Issue Type: Test > Components: system tests >Affects Versions: 3.6.0 >Reporter: Luke Chen >Priority: Major > > While running producer performance tool in system test for old client (ex: > quota_test), we will try to run with the dev-branch's jar file, to make sure > it is backward compatible, as described > [here|https://github.com/apache/kafka/blob/trunk/tests/kafkatest/services/performance/producer_performance.py#L86-L88],. > {code:java} > # In order to ensure more consistent configuration between versions, always > use the ProducerPerformance tool from the development branch {code} > > But in KAFKA-14525, we're moving tools from core module to a separate tool > module, we're actually breaking the backward compatibility. We should fix the > system test. Also maybe we should also mention anywhere about this backward > compatibility issue? > Note: > This is the command run in system test. Suppose it's testing old client 3.4.0 > (file put under `~/Downloads/kafka_2.13-3.4.0` in my env), and running under > the latest trunk env. > {code:java} > > for file in ./tools/build/libs/kafka-tools*.jar; do > > CLASSPATH=$CLASSPATH:$file; done; for file in > > ./tools/build/dependant-libs*/*.jar; do CLASSPATH=$CLASSPATH:$file; done; > > export CLASSPATH; export > > KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:config/tools-log4j.properties"; > > KAFKA_OPTS= KAFKA_HEAP_OPTS="-XX:+HeapDumpOnOutOfMemoryError" > > ~/Downloads/kafka_2.13-3.4.0/bin/kafka-run-class.sh > > org.apache.kafka.tools.ProducerPerformance --topic test_topic --num-records > > 5 --record-size 3000 --throughput -1 --producer-props > > bootstrap.servers=localhost:9092 client.id=overridden_id > Exception in thread "main" java.lang.NoClassDefFoundError: > org/apache/kafka/common/utils/ThroughputThrottler > at > org.apache.kafka.tools.ProducerPerformance.start(ProducerPerformance.java:101) > at > org.apache.kafka.tools.ProducerPerformance.main(ProducerPerformance.java:52) > Caused by: java.lang.ClassNotFoundException: > org.apache.kafka.common.utils.ThroughputThrottler > at > java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641) > at > java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188) > at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:520) > ... 2 more > {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-10457) JsonConverter.toConnectData trims BigInteger to Long for schema-less case
[ https://issues.apache.org/jira/browse/KAFKA-10457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746198#comment-17746198 ] Sagar Rao commented on KAFKA-10457: --- [~yash.mayya], IIRC, you had worked on an issue related to data type conversions leading to issues. Does this look similar? You might want to take a stab at it since you already are aware of the issue. Let me know. > JsonConverter.toConnectData trims BigInteger to Long for schema-less case > - > > Key: KAFKA-10457 > URL: https://issues.apache.org/jira/browse/KAFKA-10457 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Oleksandr Diachenko >Assignee: Oleksandr Diachenko >Priority: Critical > > > When _JsonConverter_ is configured with _schemas.enable=false_ and value, > exceeding _Double_ is passed, the result is incorrect since the converter > trims it to _Double:_ > {code:java} > Map props = Collections.singletonMap("schemas.enable", > false); > converter.configure(props, true); > BigInteger value = BigInteger.valueOf(Long.MAX_VALUE).add(new > BigInteger("1")); > String msg = value.toString(); > SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, > msg.getBytes()); > assertNull(schemaAndValue.schema()); > assertEquals(value, schemaAndValue.value()); > {code} > > Fails with: > > {code:java} > expected:<9223372036854775808> but was:<-9223372036854775808> > Expected :9223372036854775808 > Actual :-9223372036854775808 > {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-12283) Flaky Test RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining
[ https://issues.apache.org/jira/browse/KAFKA-12283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746197#comment-17746197 ] Sagar Rao edited comment on KAFKA-12283 at 7/24/23 5:02 AM: The flakiness with this test was fixed as part of this change=> https://github.com/apache/kafka/pull/12561/files#diff-7c0517709aeed38eadffb16dc43b7fc3ba8a7e9b3069f3c5a5c423caf6473470L273. We haven't seen flaky behaviour since this PR got merged. Closing. cc [~showuon] was (Author: sagarrao): The flakiness with this test was also fixed as part of this PR=> https://github.com/apache/kafka/pull/12561/files#diff-7c0517709aeed38eadffb16dc43b7fc3ba8a7e9b3069f3c5a5c423caf6473470L273. Closing. cc [~showuon] > Flaky Test > RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining > > > Key: KAFKA-12283 > URL: https://issues.apache.org/jira/browse/KAFKA-12283 > Project: Kafka > Issue Type: Test > Components: KafkaConnect, unit tests >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Critical > Labels: flaky-test > > https://github.com/apache/kafka/pull/1/checks?check_run_id=1820092809 > {quote} {{java.lang.AssertionError: Tasks are imbalanced: > localhost:36037=[seq-source13-0, seq-source13-1, seq-source13-2, > seq-source13-3, seq-source12-0, seq-source12-1, seq-source12-2, > seq-source12-3] > localhost:43563=[seq-source11-0, seq-source11-2, seq-source10-0, > seq-source10-2] > localhost:46539=[seq-source11-1, seq-source11-3, seq-source10-1, > seq-source10-3] > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.assertTrue(Assert.java:42) > at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.assertConnectorAndTasksAreUniqueAndBalanced(RebalanceSourceConnectorsIntegrationTest.java:362) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) > at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining(RebalanceSourceConnectorsIntegrationTest.java:313)}} > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-12283) Flaky Test RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining
[ https://issues.apache.org/jira/browse/KAFKA-12283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao resolved KAFKA-12283. --- Resolution: Fixed > Flaky Test > RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining > > > Key: KAFKA-12283 > URL: https://issues.apache.org/jira/browse/KAFKA-12283 > Project: Kafka > Issue Type: Test > Components: KafkaConnect, unit tests >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Critical > Labels: flaky-test > > https://github.com/apache/kafka/pull/1/checks?check_run_id=1820092809 > {quote} {{java.lang.AssertionError: Tasks are imbalanced: > localhost:36037=[seq-source13-0, seq-source13-1, seq-source13-2, > seq-source13-3, seq-source12-0, seq-source12-1, seq-source12-2, > seq-source12-3] > localhost:43563=[seq-source11-0, seq-source11-2, seq-source10-0, > seq-source10-2] > localhost:46539=[seq-source11-1, seq-source11-3, seq-source10-1, > seq-source10-3] > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.assertTrue(Assert.java:42) > at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.assertConnectorAndTasksAreUniqueAndBalanced(RebalanceSourceConnectorsIntegrationTest.java:362) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) > at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining(RebalanceSourceConnectorsIntegrationTest.java:313)}} > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-12283) Flaky Test RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining
[ https://issues.apache.org/jira/browse/KAFKA-12283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746197#comment-17746197 ] Sagar Rao commented on KAFKA-12283: --- The flakiness with this test was also fixed as part of this PR=> https://github.com/apache/kafka/pull/12561/files#diff-7c0517709aeed38eadffb16dc43b7fc3ba8a7e9b3069f3c5a5c423caf6473470L273. Closing. cc [~showuon] > Flaky Test > RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining > > > Key: KAFKA-12283 > URL: https://issues.apache.org/jira/browse/KAFKA-12283 > Project: Kafka > Issue Type: Test > Components: KafkaConnect, unit tests >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Critical > Labels: flaky-test > > https://github.com/apache/kafka/pull/1/checks?check_run_id=1820092809 > {quote} {{java.lang.AssertionError: Tasks are imbalanced: > localhost:36037=[seq-source13-0, seq-source13-1, seq-source13-2, > seq-source13-3, seq-source12-0, seq-source12-1, seq-source12-2, > seq-source12-3] > localhost:43563=[seq-source11-0, seq-source11-2, seq-source10-0, > seq-source10-2] > localhost:46539=[seq-source11-1, seq-source11-3, seq-source10-1, > seq-source10-3] > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.assertTrue(Assert.java:42) > at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.assertConnectorAndTasksAreUniqueAndBalanced(RebalanceSourceConnectorsIntegrationTest.java:362) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) > at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining(RebalanceSourceConnectorsIntegrationTest.java:313)}} > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-8391) Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector
[ https://issues.apache.org/jira/browse/KAFKA-8391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao resolved KAFKA-8391. -- Resolution: Fixed Fixed with https://github.com/apache/kafka/pull/12561 > Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector > --- > > Key: KAFKA-8391 > URL: https://issues.apache.org/jira/browse/KAFKA-8391 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Matthias J. Sax >Assignee: Sagar Rao >Priority: Critical > Labels: flaky-test > Attachments: 100-gradle-builds.tar > > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/4747/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testDeleteConnector/] > {quote}java.lang.AssertionError: Condition not met within timeout 3. > Connector tasks did not stop in time. at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:375) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:352) at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector(RebalanceSourceConnectorsIntegrationTest.java:166){quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-8391) Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector
[ https://issues.apache.org/jira/browse/KAFKA-8391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746195#comment-17746195 ] Sagar Rao edited comment on KAFKA-8391 at 7/24/23 5:00 AM: --- Just bumped upon this. The issue mentioned above (https://issues.apache.org/jira/browse/KAFKA-12495)was fixed last year and this specific test was enabled https://github.com/apache/kafka/pull/12561/files#diff-7c0517709aeed38eadffb16dc43b7fc3ba8a7e9b3069f3c5a5c423caf6473470L191. We haven't noticed flakiness around this test since then. Closing this one. cc [~showuon] was (Author: sagarrao): Just bumped upon this. The issue mentioned above (https://issues.apache.org/jira/browse/KAFKA-12495)was fixed last year and this specific test was enabled https://github.com/apache/kafka/pull/12561/files#diff-7c0517709aeed38eadffb16dc43b7fc3ba8a7e9b3069f3c5a5c423caf6473470L191. Closing this one. cc [~showuon] > Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector > --- > > Key: KAFKA-8391 > URL: https://issues.apache.org/jira/browse/KAFKA-8391 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Matthias J. Sax >Assignee: Sagar Rao >Priority: Critical > Labels: flaky-test > Attachments: 100-gradle-builds.tar > > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/4747/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testDeleteConnector/] > {quote}java.lang.AssertionError: Condition not met within timeout 3. > Connector tasks did not stop in time. at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:375) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:352) at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector(RebalanceSourceConnectorsIntegrationTest.java:166){quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-8391) Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector
[ https://issues.apache.org/jira/browse/KAFKA-8391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17746195#comment-17746195 ] Sagar Rao commented on KAFKA-8391: -- Just bumped upon this. The issue mentioned above (https://issues.apache.org/jira/browse/KAFKA-12495)was fixed last year and this specific test was enabled https://github.com/apache/kafka/pull/12561/files#diff-7c0517709aeed38eadffb16dc43b7fc3ba8a7e9b3069f3c5a5c423caf6473470L191. Closing this one. cc [~showuon] > Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector > --- > > Key: KAFKA-8391 > URL: https://issues.apache.org/jira/browse/KAFKA-8391 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Matthias J. Sax >Assignee: Sagar Rao >Priority: Critical > Labels: flaky-test > Attachments: 100-gradle-builds.tar > > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/4747/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testDeleteConnector/] > {quote}java.lang.AssertionError: Condition not met within timeout 3. > Connector tasks did not stop in time. at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:375) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:352) at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector(RebalanceSourceConnectorsIntegrationTest.java:166){quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-8391) Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector
[ https://issues.apache.org/jira/browse/KAFKA-8391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-8391: Assignee: Sagar Rao > Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector > --- > > Key: KAFKA-8391 > URL: https://issues.apache.org/jira/browse/KAFKA-8391 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Matthias J. Sax >Assignee: Sagar Rao >Priority: Critical > Labels: flaky-test > Attachments: 100-gradle-builds.tar > > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/4747/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testDeleteConnector/] > {quote}java.lang.AssertionError: Condition not met within timeout 3. > Connector tasks did not stop in time. at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:375) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:352) at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector(RebalanceSourceConnectorsIntegrationTest.java:166){quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15229) Increase default value of task.shutdown.graceful.timeout.ms
[ https://issues.apache.org/jira/browse/KAFKA-15229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao updated KAFKA-15229: -- Description: The Kafka Connect config [task.shutdown.graceful.timeout.ms. |https://kafka.apache.org/documentation/#connectconfigs_task.shutdown.graceful.timeout.ms]has a default value of 5s. As per it's definition: {noformat} Amount of time to wait for tasks to shutdown gracefully. This is the total amount of time, not per task. All task have shutdown triggered, then they are waited on sequentially.{noformat} it is the total timeout for all tasks to shutdown. Also, if multiple tasks are to be shutdown then, they are waited upon sequentially. Now the default value of this config is ok for smaller clusters with less number of tasks, on a larger cluster because the timeout can elapse we will see a lot of messages of the form {noformat} Graceful stop of task failed. {noformat} In case of failure in graceful stop of tasks, the tasks are cancelled which means that they won't send out a status update. Once that happens there won't be any `UNASSIGNED` status message posted for that task. Let's say the task stop was triggered by a worker going down. If the cluster is configured to use Incremental Cooperative Assignor, then the task wouldn't be reassigned until scheduled.rebalance.delay.max.ms interval elapses. So, for that amount of duration, the task would show up with status RUNNING whenever it's status is queried for. This can be confusing for the users. This problem can be exacerbated on cloud environments(like kubernetes pods) because there is a high chance that the running status would be associated with an older worker_id which doesn't even exist in the cluster anymore. While the net effect of all of this is not catastrophic i.e it won't lead to any processing delays or loss of data but the status of the task would be off. And if there are fast rebalances happening under Incremental Cooperative Assignor, then that duration could be high as well. So, the proposal is to increase the default value to a higher value. I am thinking we can set it to 60s because as far as I can see, it doesn't interfere with any other timeout that we have. Also while users can set this config on their clusters, this is a low importance config and generally goes unnoticed. So I believe we should look to increase the default value. I am tagging this as need-kip because I believe we will need one. was: The Kafka Connect config [task.shutdown.graceful.timeout.ms. |https://kafka.apache.org/documentation/#connectconfigs_task.shutdown.graceful.timeout.ms]has a default value of 5s. As per it's definition: {noformat} Amount of time to wait for tasks to shutdown gracefully. This is the total amount of time, not per task. All task have shutdown triggered, then they are waited on sequentially.{noformat} it is the total timeout for all tasks to shutdown. Also, if multiple tasks are to be shutdown then, they are waited upon sequentially. Now the default value of this config is ok for smaller clusters with less number of tasks, on a larger cluster because the timeout can elapse we will see a lot of messages of the form {noformat} Graceful stop of task failed. {noformat} In case of failure in graceful stop of tasks, the tasks are cancelled which means that they won't send out a status update. Once that happens there won't be any `UNASSIGNED` status message posted for that task. Let's say the task stop was triggered by a worker going down. If the cluster is configured to use Incremental Cooperative Assignor, then the task wouldn't be reassigned until scheduled.rebalance.delay.max.ms interval elapses. So, for that amount of duration, the task would show up with status RUNNING whenever it's status is queried for. This can be confusing for the users. This problem can be exacerbated on cloud environments(like kubernetes pods) because there is a high chance that the running status would be associated with an older worker_id which doesn't even exist in the cluster anymore. While the net effect of all of this is not catastrophic i.e it won't lead to any processing delays or loss of data but the status of the task would be off. And if there are fast rebalances happening under Incremental Cooperative Assignor, then that duration could be high as well. So, the proposal is to increase the default value to a higher value. I am thinking we can set it to 60s because as far as I can see, it doesn't interfere with any other timeout that we have. I am tagging this as need-kip because I believe we will need one. > Increase default value of task.shutdown.graceful.timeout.ms > --- > > Key: KAFKA-15229 > URL: https://issues.apache.org/jira/browse/KAFKA-15229 > Project: Kafka >
[jira] [Updated] (KAFKA-15229) Increase default value of task.shutdown.graceful.timeout.ms
[ https://issues.apache.org/jira/browse/KAFKA-15229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao updated KAFKA-15229: -- Description: The Kafka Connect config [task.shutdown.graceful.timeout.ms. |https://kafka.apache.org/documentation/#connectconfigs_task.shutdown.graceful.timeout.ms]has a default value of 5s. As per it's definition: {noformat} Amount of time to wait for tasks to shutdown gracefully. This is the total amount of time, not per task. All task have shutdown triggered, then they are waited on sequentially.{noformat} it is the total timeout for all tasks to shutdown. Also, if multiple tasks are to be shutdown then, they are waited upon sequentially. Now the default value of this config is ok for smaller clusters with less number of tasks, on a larger cluster because the timeout can elapse we will see a lot of messages of the form {noformat} Graceful stop of task failed. {noformat} In case of failure in graceful stop of tasks, the tasks are cancelled which means that they won't send out a status update. Once that happens there won't be any `UNASSIGNED` status message posted for that task. Let's say the task stop was triggered by a worker going down. If the cluster is configured to use Incremental Cooperative Assignor, then the task wouldn't be reassigned until scheduled.rebalance.delay.max.ms interval elapses. So, for that amount of duration, the task would show up with status RUNNING whenever it's status is queried for. This can be confusing for the users. This problem can be exacerbated on cloud environments(like kubernetes pods) because there is a high chance that the running status would be associated with an older worker_id which doesn't even exist in the cluster anymore. While the net effect of all of this is not catastrophic i.e it won't lead to any processing delays or loss of data but the status of the task would be off. And if there are fast rebalances happening under Incremental Cooperative Assignor, then that duration could be high as well. So, the proposal is to increase the default value to a higher value. I am thinking we can set it to 60s because as far as I can see, it doesn't interfere with any other timeout that we have. I am tagging this as need-kip because I believe we will need one. was: The Kafka Connect config [task.shutdown.graceful.timeout.ms. |https://kafka.apache.org/documentation/#connectconfigs_task.shutdown.graceful.timeout.ms]has a default value of 5s. As per it's definition: {noformat} Amount of time to wait for tasks to shutdown gracefully. This is the total amount of time, not per task. All task have shutdown triggered, then they are waited on sequentially.{noformat} it is the total timeout for all tasks to shutdown. Also, if multiple tasks are to be shutdown then, they are waited upon sequentially. Now the default value of this config is ok for smaller clusters with less number of tasks, on a larger cluster because the timeout can elapse we will see a lot of messages of the form ``` Graceful stop of task failed. ``` In case of failure in graceful stop of tasks, the tasks are cancelled which means that they won't send out a status update. Once that happens there won't be any `UNASSIGNED` status message posted for that task. Let's say the task stop was triggered by a worker going down. If the cluster is configured to use Incremental Cooperative Assignor, then the task wouldn't be reassigned until scheduled.rebalance.delay.max.ms interval elapses. So, for that amount of duration, the task would show up with status RUNNING whenever it's status is queried for. This can be confusing for the users. This problem can be exacerbated on cloud environments(like kubernetes pods) because there is a high chance that the running status would be associated with an older worker_id which doesn't even exist in the cluster anymore. While the net effect of all of this is not catastrophic i.e it won't lead to any processing delays or loss of data but the status of the task would be off. And if there are fast rebalances happening under Incremental Cooperative Assignor, then that duration could be high as well. So, the proposal is to increase the default value to a higher value. I am thinking we can set it to 60s because as far as I can see, it doesn't interfere with any other timeout that we have. I am tagging this as need-kip because I believe we will need one. > Increase default value of task.shutdown.graceful.timeout.ms > --- > > Key: KAFKA-15229 > URL: https://issues.apache.org/jira/browse/KAFKA-15229 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > > The Kafka Connect config
[jira] [Created] (KAFKA-15229) Increase default value of task.shutdown.graceful.timeout.ms
Sagar Rao created KAFKA-15229: - Summary: Increase default value of task.shutdown.graceful.timeout.ms Key: KAFKA-15229 URL: https://issues.apache.org/jira/browse/KAFKA-15229 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Sagar Rao Assignee: Sagar Rao The Kafka Connect config [task.shutdown.graceful.timeout.ms. |https://kafka.apache.org/documentation/#connectconfigs_task.shutdown.graceful.timeout.ms]has a default value of 5s. As per it's definition: {noformat} Amount of time to wait for tasks to shutdown gracefully. This is the total amount of time, not per task. All task have shutdown triggered, then they are waited on sequentially.{noformat} it is the total timeout for all tasks to shutdown. Also, if multiple tasks are to be shutdown then, they are waited upon sequentially. Now the default value of this config is ok for smaller clusters with less number of tasks, on a larger cluster because the timeout can elapse we will see a lot of messages of the form ``` Graceful stop of task failed. ``` In case of failure in graceful stop of tasks, the tasks are cancelled which means that they won't send out a status update. Once that happens there won't be any `UNASSIGNED` status message posted for that task. Let's say the task stop was triggered by a worker going down. If the cluster is configured to use Incremental Cooperative Assignor, then the task wouldn't be reassigned until scheduled.rebalance.delay.max.ms interval elapses. So, for that amount of duration, the task would show up with status RUNNING whenever it's status is queried for. This can be confusing for the users. This problem can be exacerbated on cloud environments(like kubernetes pods) because there is a high chance that the running status would be associated with an older worker_id which doesn't even exist in the cluster anymore. While the net effect of all of this is not catastrophic i.e it won't lead to any processing delays or loss of data but the status of the task would be off. And if there are fast rebalances happening under Incremental Cooperative Assignor, then that duration could be high as well. So, the proposal is to increase the default value to a higher value. I am thinking we can set it to 60s because as far as I can see, it doesn't interfere with any other timeout that we have. I am tagging this as need-kip because I believe we will need one. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15218) NPE will be thrown while deleting topic and fetch from follower concurrently
[ https://issues.apache.org/jira/browse/KAFKA-15218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-15218: - Assignee: Sagar Rao > NPE will be thrown while deleting topic and fetch from follower concurrently > > > Key: KAFKA-15218 > URL: https://issues.apache.org/jira/browse/KAFKA-15218 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.5.0 >Reporter: Luke Chen >Assignee: Sagar Rao >Priority: Major > > When deleting topics, we'll first clear all the remoteReplicaMap when > stopPartitions > [here|https://github.com/apache/kafka/blob/2999168cde37142ae3a2377fe939d6b581e692b8/core/src/main/scala/kafka/server/ReplicaManager.scala#L554]. > But this time, there might be fetch request coming from follower, and try to > check if the replica is eligible to be added into ISR > [here|https://github.com/apache/kafka/blob/2999168cde37142ae3a2377fe939d6b581e692b8/core/src/main/scala/kafka/cluster/Partition.scala#L1001]. > At this moment, NPE will be thrown. Although it's fine since this topic is > already deleted, it'd be better to avoid it happen. > > > {code:java} > java.lang.NullPointerException: Cannot invoke > "kafka.cluster.Replica.stateSnapshot()" because the return value of > "kafka.utils.Pool.get(Object)" is null at > kafka.cluster.Partition.isReplicaIsrEligible(Partition.scala:992) > ~[kafka_2.13-3.5.0.jar:?] at > kafka.cluster.Partition.canAddReplicaToIsr(Partition.scala:974) > ~[kafka_2.13-3.5.0.jar:?]at > kafka.cluster.Partition.maybeExpandIsr(Partition.scala:947) > ~[kafka_2.13-3.5.0.jar:?]at > kafka.cluster.Partition.updateFollowerFetchState(Partition.scala:866) > ~[kafka_2.13-3.5.0.jar:?] at > kafka.cluster.Partition.fetchRecords(Partition.scala:1361) > ~[kafka_2.13-3.5.0.jar:?] at > kafka.server.ReplicaManager.read$1(ReplicaManager.scala:1164) > ~[kafka_2.13-3.5.0.jar:?] at > kafka.server.ReplicaManager.$anonfun$readFromLocalLog$7(ReplicaManager.scala:1235) > ~[kafka_2.13-3.5.0.jar:?] at > scala.collection.IterableOnceOps.foreach(IterableOnce.scala:575) > ~[scala-library-2.13.10.jar:?] at > scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:573) > ~[scala-library-2.13.10.jar:?] at > scala.collection.AbstractIterable.foreach(Iterable.scala:933) > ~[scala-library-2.13.10.jar:?] at > kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:1234) > ~[kafka_2.13-3.5.0.jar:?]at > kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:1044) > ~[kafka_2.13-3.5.0.jar:?] at > kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:994) > ~[kafka_2.13-3.5.0.jar:?] at > kafka.server.KafkaApis.handle(KafkaApis.scala:181) ~[kafka_2.13-3.5.0.jar:?] > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:76) > ~[kafka_2.13-3.5.0.jar:?] at java.lang.Thread.run(Thread.java:1623) [?:?] > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-12525) Inaccurate task status due to status record interleaving in fast rebalances in Connect
[ https://issues.apache.org/jira/browse/KAFKA-12525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao resolved KAFKA-12525. --- Resolution: Fixed > Inaccurate task status due to status record interleaving in fast rebalances > in Connect > -- > > Key: KAFKA-12525 > URL: https://issues.apache.org/jira/browse/KAFKA-12525 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.1, 2.4.1, 2.5.1, 2.7.0, 2.6.1 >Reporter: Konstantine Karantasis >Assignee: Sagar Rao >Priority: Major > > When a task is stopped in Connect it produces an {{UNASSIGNED}} status > record. > Equivalently, when a task is started or restarted in Connect it produces an > {{RUNNING}} status record in the Connect status topic. > At the same time rebalances are decoupled from task start and stop. These > operations happen in separate executor outside of the main worker thread that > performs the rebalance. > Normally, any delayed and stale {{UNASSIGNED}} status records are fenced by > the worker that is sending them. This worker is using the > {{StatusBackingStore#putSafe}} method that will reject any stale status > messages (called only for {{UNASSIGNED}} or {{FAILED}}) as long as the worker > is aware of the newer status record that declares a task as {{RUNNING}}. > In cases of fast consecutive rebalances where a task is revoked from one > worker and assigned to another one, it has been observed that there is a > small time window and thus a race condition during which a {{RUNNING}} status > record in the new generation is produced and is immediately followed by a > delayed {{UNASSIGNED}} status record belonging to the same or a previous > generation before the worker that sends this message reads the {{RUNNING}} > status record that corresponds to the latest generation. > A couple of options are available to remediate this race condition. > For example a worker that is has started a task can re-write the {{RUNNING}} > status message in the topic if it reads a stale {{UNASSIGNED}} message from a > previous generation (that should have been fenced). > Another option is to ignore stale {{UNASSIGNED}} message (messages from an > earlier generation than the one in which the task had {{RUNNING}} status). > Worth noting that when this race condition takes place, besides the > inaccurate status representation, the actual execution of the tasks remains > unaffected (e.g. the tasks are running correctly even though they appear as > {{UNASSIGNED}}). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15127) Allow offsets to be reset at the same time a connector is deleted.
[ https://issues.apache.org/jira/browse/KAFKA-15127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao updated KAFKA-15127: -- Description: This has been listed as [Future Work|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Automaticallydeleteoffsetswithconnectors] in KIP-875. Now that the delete offsets mechanism is also in place, we can take this up which will allow connector names to be reused after connector deletion. Note that we can get started with this once the Offsets management API has been adopted by a few users and we have got feedback around it's functioning. was:This has been listed as [Future Work|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Automaticallydeleteoffsetswithconnectors] in KIP-875. Now that the delete offsets mechanism is also in place, we can take this up which will allow connector names to be reused after connector deletion. > Allow offsets to be reset at the same time a connector is deleted. > -- > > Key: KAFKA-15127 > URL: https://issues.apache.org/jira/browse/KAFKA-15127 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Sagar Rao >Priority: Major > Labels: needs-kip > > This has been listed as [Future > Work|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Automaticallydeleteoffsetswithconnectors] > in KIP-875. Now that the delete offsets mechanism is also in place, we can > take this up which will allow connector names to be reused after connector > deletion. > > Note that we can get started with this once the Offsets management API has > been adopted by a few users and we have got feedback around it's functioning. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15127) Allow offsets to be reset at the same time a connector is deleted.
[ https://issues.apache.org/jira/browse/KAFKA-15127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-15127: - Assignee: (was: Sagar Rao) > Allow offsets to be reset at the same time a connector is deleted. > -- > > Key: KAFKA-15127 > URL: https://issues.apache.org/jira/browse/KAFKA-15127 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Sagar Rao >Priority: Major > Labels: needs-kip > > This has been listed as [Future > Work|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Automaticallydeleteoffsetswithconnectors] > in KIP-875. Now that the delete offsets mechanism is also in place, we can > take this up which will allow connector names to be reused after connector > deletion. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15127) Allow offsets to be reset at the same time a connector is deleted.
[ https://issues.apache.org/jira/browse/KAFKA-15127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17738704#comment-17738704 ] Sagar Rao commented on KAFKA-15127: --- I don't intend working on this immediately or in the near future. It's just created for future ref. > Allow offsets to be reset at the same time a connector is deleted. > -- > > Key: KAFKA-15127 > URL: https://issues.apache.org/jira/browse/KAFKA-15127 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > > This has been listed as [Future > Work|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Automaticallydeleteoffsetswithconnectors] > in KIP-875. Now that the delete offsets mechanism is also in place, we can > take this up which will allow connector names to be reused after connector > deletion. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15127) Allow offsets to be reset at the same time a connector is deleted.
[ https://issues.apache.org/jira/browse/KAFKA-15127?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-15127: - Assignee: Sagar Rao > Allow offsets to be reset at the same time a connector is deleted. > -- > > Key: KAFKA-15127 > URL: https://issues.apache.org/jira/browse/KAFKA-15127 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > > This has been listed as [Future > Work|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Automaticallydeleteoffsetswithconnectors] > in KIP-875. Now that the delete offsets mechanism is also in place, we can > take this up which will allow connector names to be reused after connector > deletion. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15127) Allow offsets to be reset at the same time a connector is deleted.
Sagar Rao created KAFKA-15127: - Summary: Allow offsets to be reset at the same time a connector is deleted. Key: KAFKA-15127 URL: https://issues.apache.org/jira/browse/KAFKA-15127 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Sagar Rao This has been listed as [Future Work|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Automaticallydeleteoffsetswithconnectors] in KIP-875. Now that the delete offsets mechanism is also in place, we can take this up which will allow connector names to be reused after connector deletion. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14913) Migrate DistributedHerder Executor shutdown to use ThreadUtils#shutdownExecutorServiceQuietly
[ https://issues.apache.org/jira/browse/KAFKA-14913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao resolved KAFKA-14913. --- Resolution: Fixed > Migrate DistributedHerder Executor shutdown to use > ThreadUtils#shutdownExecutorServiceQuietly > - > > Key: KAFKA-14913 > URL: https://issues.apache.org/jira/browse/KAFKA-14913 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Minor > > Some context here: > https://github.com/apache/kafka/pull/13557#issuecomment-1509738740 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14913) Migrate DistributedHerder Executor shutdown to use ThreadUtils#shutdownExecutorServiceQuietly
[ https://issues.apache.org/jira/browse/KAFKA-14913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17733859#comment-17733859 ] Sagar Rao commented on KAFKA-14913: --- [~elkkhan] , the PR for this fix is already merged. [https://github.com/apache/kafka/pull/13594] For some reason, the ticket didn't get updated. > Migrate DistributedHerder Executor shutdown to use > ThreadUtils#shutdownExecutorServiceQuietly > - > > Key: KAFKA-14913 > URL: https://issues.apache.org/jira/browse/KAFKA-14913 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Minor > > Some context here: > https://github.com/apache/kafka/pull/13557#issuecomment-1509738740 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-14131) KafkaBasedLog#readToLogEnd() may accciedently falls into infinite loop
[ https://issues.apache.org/jira/browse/KAFKA-14131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-14131: - Assignee: Sambhav Jain (was: Sagar Rao) > KafkaBasedLog#readToLogEnd() may accciedently falls into infinite loop > -- > > Key: KAFKA-14131 > URL: https://issues.apache.org/jira/browse/KAFKA-14131 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Justinwins >Assignee: Sambhav Jain >Priority: Major > > When a herder starts ,its KafkaOffsetBackingStore will readToLogEnd() by > DistributedHerder.herderExecutor of name "Distrubuted-connect-" thread , e.g > . Distrubuted-connect-28-1 , which may consume a few minutes. > If another thread tries to shut down this herder , it will block for > "task.shutdown.graceful.timeout.ms ' before the > DistributedHerder.herderExecutor is interrupted. > And if thread in DistributedHerder.herderExecutor is interupted, > KafkaOffsetBackingStore.readToLogEnd() will poll(Integer.MAX_VALUE) and log " > Error polling" as the the read has been interrupted, then > "consumer.position" will not advance, readToLogEnd() falls into infinite loop. > > {code:java} > // code placeholder > private void readToLogEnd() { > Set assignment = consumer.assignment(); > Map endOffsets = readEndOffsets(assignment); > log.trace("Reading to end of log offsets {}", endOffsets); > while (!endOffsets.isEmpty()) { // this loop will never jump out > Iterator> it = > endOffsets.entrySet().iterator(); > while (it.hasNext()) { > Map.Entry entry = it.next(); > TopicPartition topicPartition = entry.getKey(); > long endOffset = entry.getValue(); > long lastConsumedOffset = consumer.position(topicPartition); // > when thread was in interupted status ,consumer.position will not advance > if (lastConsumedOffset >= endOffset) { > log.trace("Read to end offset {} for {}", endOffset, > topicPartition); > it.remove(); > } else { > log.trace("Behind end offset {} for {}; last-read offset is > {}", > endOffset, topicPartition, lastConsumedOffset); > poll(Integer.MAX_VALUE); // here , poll() will catch > InterruptedException and log it without throwing it up > break; > } > } > } > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-15041) Source Connector auto topic creation fails when topic is deleted and brokers don't support auto topic creation
[ https://issues.apache.org/jira/browse/KAFKA-15041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao updated KAFKA-15041: -- Description: [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics] allows the source connectors to create topics even when the broker doesn't allow to do so. It does so by checking for every record if a topic needs to be created [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L500.] To not always keep checking for topic presence via admin topics, it also maintains a cache of the topics that it has created and doesn't create those anymore. This helps to create topics when brokers don't support automatic topic creation. However, lets say the topic gets created initially and later on gets deleted while the connector is still running and the brokers don't support automatic topic creation. For such cases, the connector has cached the topic it has already created and wouldn't recreate it because the cache never updates and since the broker doesn't support topic creation, the logs would just be full of messages like {code:java} Error while fetching metadata with correlation id 3260 : {connect-test=UNKNOWN_TOPIC_OR_PARTITION}{code} This can become a problem on environments where brokers don't allow topic creation. We need a way to refresh the topics cache for such cases. was: [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics] allows the source connectors to create topics even when the broker doesn't allow to do so. It does so by checking for every record if a topic needs to be created [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L500.] To not always keep checking for topic presence via admin topics, it also maintains a cache of the topics that it has created and doesn't create those anymore. This helps to create topics when brokers don't support automatic topic creation. However, lets say the topic gets created initially and later on gets deleted while the connector is still running and the brokers don't support automatic topic creation. For such cases, the connector has cached the topic it has already created and wouldn't recreate it because the cache never updates and since the broker doesn't support topic creation, the logs would just be full of messages like {code:java} Error while fetching metadata with correlation id 3260 : {connect-test=UNKNOWN_TOPIC_OR_PARTITION}{code} This can become a problem on enviroments where brokers don't allow topic creation. We need a way to refresh the topics cache for such cases. > Source Connector auto topic creation fails when topic is deleted and brokers > don't support auto topic creation > -- > > Key: KAFKA-15041 > URL: https://issues.apache.org/jira/browse/KAFKA-15041 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > > [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics] > allows the source connectors to create topics even when the broker doesn't > allow to do so. It does so by checking for every record if a topic needs to > be created > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L500.] > To not always keep checking for topic presence via admin topics, it also > maintains a cache of the topics that it has created and doesn't create those > anymore. This helps to create topics when brokers don't support automatic > topic creation. > However, lets say the topic gets created initially and later on gets deleted > while the connector is still running and the brokers don't support automatic > topic creation. For such cases, the connector has cached the topic it has > already created and wouldn't recreate it because the cache never updates and > since the broker doesn't support topic creation, the logs would just be full > of messages like > > {code:java} > Error while fetching metadata with correlation id 3260 : > {connect-test=UNKNOWN_TOPIC_OR_PARTITION}{code} > > This can become a problem on environments where brokers don't allow topic > creation. We need a way to refresh the topics cache for such cases. -- This message was sent by Atlassian
[jira] [Updated] (KAFKA-15041) Source Connector auto topic creation fails when topic is deleted and brokers don't support auto topic creation
[ https://issues.apache.org/jira/browse/KAFKA-15041?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao updated KAFKA-15041: -- Description: [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics] allows the source connectors to create topics even when the broker doesn't allow to do so. It does so by checking for every record if a topic needs to be created [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L500.] To not always keep checking for topic presence via admin topics, it also maintains a cache of the topics that it has created and doesn't create those anymore. This helps to create topics when brokers don't support automatic topic creation. However, lets say the topic gets created initially and later on gets deleted while the connector is still running and the brokers don't support automatic topic creation. For such cases, the connector has cached the topic it has already created and wouldn't recreate it because the cache never updates and since the broker doesn't support topic creation, the logs would just be full of messages like {code:java} Error while fetching metadata with correlation id 3260 : {connect-test=UNKNOWN_TOPIC_OR_PARTITION}{code} This can become a problem on enviroments where brokers don't allow topic creation. We need a way to refresh the topics cache for such cases. was: [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics] allows the source connectors to create topics even when the broker doesn't allow to do so. It does so by checking for every record if a topic needs to be created [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L500.] To not always keep checking for topic presence via admin topics, it also maintains a cache of the topics that it has created and doesn't create those anymore. This helps to create topics when brokers don't support automatic topic creation. However, lets say the topic gets created initially and later on gets deleted while the connector is still running and the brokers don't support automatic topic creation. For such cases, the connector has cached the topic it has already created and wouldn't recreate it because the cache never updates and since the broker doesn't support topic creation, the logs would just be full of messages like ``` Error while fetching metadata with correlation id 3260 : \{connect-test=UNKNOWN_TOPIC_OR_PARTITION} ``` This can become a problem on enviroments where brokers don't allow topic creation. We need a way to refresh the topics cache for such cases. > Source Connector auto topic creation fails when topic is deleted and brokers > don't support auto topic creation > -- > > Key: KAFKA-15041 > URL: https://issues.apache.org/jira/browse/KAFKA-15041 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > > [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics] > allows the source connectors to create topics even when the broker doesn't > allow to do so. It does so by checking for every record if a topic needs to > be created > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L500.] > To not always keep checking for topic presence via admin topics, it also > maintains a cache of the topics that it has created and doesn't create those > anymore. This helps to create topics when brokers don't support automatic > topic creation. > However, lets say the topic gets created initially and later on gets deleted > while the connector is still running and the brokers don't support automatic > topic creation. For such cases, the connector has cached the topic it has > already created and wouldn't recreate it because the cache never updates and > since the broker doesn't support topic creation, the logs would just be full > of messages like > > {code:java} > Error while fetching metadata with correlation id 3260 : > {connect-test=UNKNOWN_TOPIC_OR_PARTITION}{code} > > This can become a problem on enviroments where brokers don't allow topic > creation. We need a way to refresh the topics cache for such cases. -- This message was sent by Atlassian Jira
[jira] [Commented] (KAFKA-15041) Source Connector auto topic creation fails when topic is deleted and brokers don't support auto topic creation
[ https://issues.apache.org/jira/browse/KAFKA-15041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727894#comment-17727894 ] Sagar Rao commented on KAFKA-15041: --- Ideally a worker restart should fix this but it didn't quite happen on my local testing. > Source Connector auto topic creation fails when topic is deleted and brokers > don't support auto topic creation > -- > > Key: KAFKA-15041 > URL: https://issues.apache.org/jira/browse/KAFKA-15041 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > > [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics] > allows the source connectors to create topics even when the broker doesn't > allow to do so. It does so by checking for every record if a topic needs to > be created > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L500.] > To not always keep checking for topic presence via admin topics, it also > maintains a cache of the topics that it has created and doesn't create those > anymore. This helps to create topics when brokers don't support automatic > topic creation. > However, lets say the topic gets created initially and later on gets deleted > while the connector is still running and the brokers don't support automatic > topic creation. For such cases, the connector has cached the topic it has > already created and wouldn't recreate it because the cache never updates and > since the broker doesn't support topic creation, the logs would just be full > of messages like > ``` > Error while fetching metadata with correlation id 3260 : > \{connect-test=UNKNOWN_TOPIC_OR_PARTITION} > ``` > > This can become a problem on enviroments where brokers don't allow topic > creation. We need a way to refresh the topics cache for such cases. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15041) Source Connector auto topic creation fails when topic is deleted and brokers don't support auto topic creation
Sagar Rao created KAFKA-15041: - Summary: Source Connector auto topic creation fails when topic is deleted and brokers don't support auto topic creation Key: KAFKA-15041 URL: https://issues.apache.org/jira/browse/KAFKA-15041 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Sagar Rao Assignee: Sagar Rao [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics] allows the source connectors to create topics even when the broker doesn't allow to do so. It does so by checking for every record if a topic needs to be created [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L500.] To not always keep checking for topic presence via admin topics, it also maintains a cache of the topics that it has created and doesn't create those anymore. This helps to create topics when brokers don't support automatic topic creation. However, lets say the topic gets created initially and later on gets deleted while the connector is still running and the brokers don't support automatic topic creation. For such cases, the connector has cached the topic it has already created and wouldn't recreate it because the cache never updates and since the broker doesn't support topic creation, the logs would just be full of messages like ``` Error while fetching metadata with correlation id 3260 : \{connect-test=UNKNOWN_TOPIC_OR_PARTITION} ``` This can become a problem on enviroments where brokers don't allow topic creation. We need a way to refresh the topics cache for such cases. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15018) Potential tombstone offsets corruption for exactly-once source connectors
[ https://issues.apache.org/jira/browse/KAFKA-15018?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17727444#comment-17727444 ] Sagar Rao commented on KAFKA-15018: --- Thanks [~yash.mayya] . I have assigned this to myself. I went through your 2 approaches and feel that option 1 (the option also mentioned by Chris) might be the cleaner one to handle this case. While the second option is viable, it would need some more changes to have it incorporated correctly- as you pointed out. > Potential tombstone offsets corruption for exactly-once source connectors > - > > Key: KAFKA-15018 > URL: https://issues.apache.org/jira/browse/KAFKA-15018 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.5.0, 3.4.1 >Reporter: Chris Egerton >Assignee: Sagar Rao >Priority: Major > > When exactly-once support is enabled for source connectors, source offsets > can potentially be written to two different offsets topics: a topic specific > to the connector, and the global offsets topic (which was used for all > connectors prior to KIP-618 / version 3.3.0). > Precedence is given to offsets in the per-connector offsets topic, but if > none are found for a given partition, then the global offsets topic is used > as a fallback. > When committing offsets, a transaction is used to ensure that source records > and source offsets are written to the Kafka cluster targeted by the source > connector. This transaction only includes the connector-specific offsets > topic. Writes to the global offsets topic take place after writes to the > connector-specific offsets topic have completed successfully, and if they > fail, a warning message is logged, but no other action is taken. > Normally, this ensures that, for offsets committed by exactly-once-supported > source connectors, the per-connector offsets topic is at least as up-to-date > as the global offsets topic, and sometimes even ahead. > However, for tombstone offsets, we lose that guarantee. If a tombstone offset > is successfully written to the per-connector offsets topic, but cannot be > written to the global offsets topic, then the global offsets topic will still > contain that source offset, but the per-connector topic will not. Due to the > fallback-on-global logic used by the worker, if a task requests offsets for > one of the tombstoned partitions, the worker will provide it with the offsets > present in the global offsets topic, instead of indicating to the task that > no offsets can be found. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15018) Potential tombstone offsets corruption for exactly-once source connectors
[ https://issues.apache.org/jira/browse/KAFKA-15018?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-15018: - Assignee: Sagar Rao > Potential tombstone offsets corruption for exactly-once source connectors > - > > Key: KAFKA-15018 > URL: https://issues.apache.org/jira/browse/KAFKA-15018 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.3.0, 3.4.0, 3.3.1, 3.3.2, 3.5.0, 3.4.1 >Reporter: Chris Egerton >Assignee: Sagar Rao >Priority: Major > > When exactly-once support is enabled for source connectors, source offsets > can potentially be written to two different offsets topics: a topic specific > to the connector, and the global offsets topic (which was used for all > connectors prior to KIP-618 / version 3.3.0). > Precedence is given to offsets in the per-connector offsets topic, but if > none are found for a given partition, then the global offsets topic is used > as a fallback. > When committing offsets, a transaction is used to ensure that source records > and source offsets are written to the Kafka cluster targeted by the source > connector. This transaction only includes the connector-specific offsets > topic. Writes to the global offsets topic take place after writes to the > connector-specific offsets topic have completed successfully, and if they > fail, a warning message is logged, but no other action is taken. > Normally, this ensures that, for offsets committed by exactly-once-supported > source connectors, the per-connector offsets topic is at least as up-to-date > as the global offsets topic, and sometimes even ahead. > However, for tombstone offsets, we lose that guarantee. If a tombstone offset > is successfully written to the per-connector offsets topic, but cannot be > written to the global offsets topic, then the global offsets topic will still > contain that source offset, but the per-connector topic will not. Due to the > fallback-on-global logic used by the worker, if a task requests offsets for > one of the tombstoned partitions, the worker will provide it with the offsets > present in the global offsets topic, instead of indicating to the task that > no offsets can be found. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15005) Status of KafkaConnect task not correct
[ https://issues.apache.org/jira/browse/KAFKA-15005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17724615#comment-17724615 ] Sagar Rao commented on KAFKA-15005: --- Thank you [~LucentWong] , I have updated the condition in the PR to include stale UNASSIGNED status from both previous and current generation. > Status of KafkaConnect task not correct > --- > > Key: KAFKA-15005 > URL: https://issues.apache.org/jira/browse/KAFKA-15005 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.5.1, 3.0.0, 3.3.2 >Reporter: Yu Wang >Priority: Major > > Our MM2 is running version 2.5.1. > After a rebalance of our MM2 source tasks, we found there were several tasks > always in *UNASSIGNED* status, even the real tasks already started. > So we dump the payload of the status topic of Kafka Connect, and found the > last two status change is status *RUNNING* followed by status > {*}UNASSIGNED{*}. > {code:java} > LogAppendTime: 1684167885961 keysize: 64 valuesize: 95 sequence: -1 > headerKeys: [] key: task-7 payload: > {"state":"RUNNING","trace":null,"worker_id":"worker-1","generation":437643} > LogAppendTime: 1684167903456 keysize: 64 valuesize: 98 sequence: -1 > headerKeys: [] key: task-7 payload: > {"state":"UNASSIGNED","trace":null,"worker_id":"worker-2","generation":437643} > {code} > But usually, the RUNNING status should be appended after the UNASSIGNED, > because the worker coordinator will revoked the tasks before start new tasks. > Then we checked the log of our MM2 worker. And found that, during that time, > there was a task that revoked on worker-2 and started on worker-1. > > Worker-1 > {code:java} > [2023-05-15 09:24:45,950] INFO [Worker clientId=connect-1, > groupId=__group] Starting task task-7 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > [2023-05-15 09:24:45,951] INFO Creating task task-7 > (org.apache.kafka.connect.runtime.Worker) {code} > Worker-2 > {code:java} > [2023-05-15 09:24:40,922] INFO Stopping task task-7 > (org.apache.kafka.connect.runtime.Worker) {code} > > So I think the incorrect status was caused by the revoked task finished later > than the new started task, which made the UNASSIGNED status append to that > status topic after the RUNNING status. > > After reading the code of DistributeHerder, I found that the task revoking is > running in a thread pool, the revoke operation just return after submit all > the callables. So I think even in the same worker, there is not a guarantee > that the revoke operation will always finish before the new tasks start. > {code:java} > for (final ConnectorTaskId taskId : tasks) { > callables.add(getTaskStoppingCallable(taskId)); > } > // The actual timeout for graceful task/connector stop is applied in worker's > // stopAndAwaitTask/stopAndAwaitConnector methods. > startAndStop(callables); > log.info("Finished stopping tasks in preparation for rebalance"); {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15005) Status of KafkaConnect task not correct
[ https://issues.apache.org/jira/browse/KAFKA-15005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17724481#comment-17724481 ] Sagar Rao commented on KAFKA-15005: --- Thanks for the comment [~LucentWong] . Yeah in that PR, I had avoided the current generation. One thing I wanted to check with you (since you have rightfully masked the worker_ids from the status topic dump), the RUNNING message that you see corresponds to worker_1 i.e the worker where the task started and the UNASSIGNED status message belongs to worker_2? I am pretty sure that is the case but just being double sure. In that case, it should be very safe to add that check of generation equality as well. > Status of KafkaConnect task not correct > --- > > Key: KAFKA-15005 > URL: https://issues.apache.org/jira/browse/KAFKA-15005 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.5.1, 3.0.0, 3.3.2 >Reporter: Yu Wang >Priority: Major > > Our MM2 is running version 2.5.1. > After a rebalance of our MM2 source tasks, we found there were several tasks > always in *UNASSIGNED* status, even the real tasks already started. > So we dump the payload of the status topic of Kafka Connect, and found the > last two status change is status *RUNNING* followed by status > {*}UNASSIGNED{*}. > {code:java} > LogAppendTime: 1684167885961 keysize: 64 valuesize: 95 sequence: -1 > headerKeys: [] key: task-7 payload: > {"state":"RUNNING","trace":null,"worker_id":"x","generation":437643} > LogAppendTime: 1684167903456 keysize: 64 valuesize: 98 sequence: -1 > headerKeys: [] key: task-7 payload: > {"state":"UNASSIGNED","trace":null,"worker_id":"x","generation":437643} > {code} > But usually, the RUNNING status should be appended after the UNASSIGNED, > because the worker coordinator will revoked the tasks before start new tasks. > Then we checked the log of our MM2 worker. And found that, during that time, > there was a task that revoked on worker-2 and started on worker-1. > > Worker-1 > {code:java} > [2023-05-15 09:24:45,950] INFO [Worker clientId=connect-1, > groupId=__group] Starting task task-7 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > [2023-05-15 09:24:45,951] INFO Creating task task-7 > (org.apache.kafka.connect.runtime.Worker) {code} > Worker-2 > {code:java} > [2023-05-15 09:24:40,922] INFO Stopping task task-7 > (org.apache.kafka.connect.runtime.Worker) {code} > > So I think the incorrect status was caused by the revoked task finished later > than the new started task, which made the UNASSIGNED status append to that > status topic after the RUNNING status. > > After reading the code of DistributeHerder, I found that the task revoking is > running in a thread pool, the revoke operation just return after submit all > the callables. So I think even in the same worker, there is not a guarantee > that the revoke operation will always finish before the new tasks start. > {code:java} > for (final ConnectorTaskId taskId : tasks) { > callables.add(getTaskStoppingCallable(taskId)); > } > // The actual timeout for graceful task/connector stop is applied in worker's > // stopAndAwaitTask/stopAndAwaitConnector methods. > startAndStop(callables); > log.info("Finished stopping tasks in preparation for rebalance"); {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-15005) Status of KafkaConnect task not correct
[ https://issues.apache.org/jira/browse/KAFKA-15005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17723885#comment-17723885 ] Sagar Rao commented on KAFKA-15005: --- hey [~LucentWong] thanks for reporting this. This looks very similar to https://issues.apache.org/jira/browse/KAFKA-12525 and has been present for a while. There's also a PR submitted for that bug. > Status of KafkaConnect task not correct > --- > > Key: KAFKA-15005 > URL: https://issues.apache.org/jira/browse/KAFKA-15005 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.5.1, 3.0.0, 3.3.2 >Reporter: Yu Wang >Priority: Major > > Our MM2 is running version 2.5.1. > After a rebalance of our MM2 source tasks, we found there were several tasks > always in *UNASSIGNED* status, even the real tasks already started. > So we dump the payload of the status topic of Kafka Connect, and found the > last two status change is status *RUNNING* followed by status > {*}UNASSIGNED{*}. > {code:java} > LogAppendTime: 1684167885961 keysize: 64 valuesize: 95 sequence: -1 > headerKeys: [] key: task-7 payload: > {"state":"RUNNING","trace":null,"worker_id":"x","generation":437643} > LogAppendTime: 1684167903456 keysize: 64 valuesize: 98 sequence: -1 > headerKeys: [] key: task-7 payload: > {"state":"UNASSIGNED","trace":null,"worker_id":"x","generation":437643} > {code} > But usually, the RUNNING status should be appended after the UNASSIGNED, > because the worker coordinator will revoked the tasks before start new tasks. > Then we checked the log of our MM2 worker. And found that, during that time, > there was a task that revoked on worker-2 and started on worker-1. > > Worker-1 > {code:java} > [2023-05-15 09:24:45,950] INFO [Worker clientId=connect-1, > groupId=__group] Starting task task-7 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > [2023-05-15 09:24:45,951] INFO Creating task task-7 > (org.apache.kafka.connect.runtime.Worker) {code} > Worker-2 > {code:java} > [2023-05-15 09:24:40,922] INFO Stopping task task-7 > (org.apache.kafka.connect.runtime.Worker) {code} > > So I think the incorrect status was caused by the revoked task finished later > than the new started task, which made the UNASSIGNED status append to that > status topic after the RUNNING status. > > After reading the code of DistributeHerder, I found that the task revoking is > running in a thread pool, the revoke operation just return after submit all > the callables. So I think even in the same worker, there is not a guarantee > that the revoke operation will always finish before the new tasks start. > {code:java} > for (final ConnectorTaskId taskId : tasks) { > callables.add(getTaskStoppingCallable(taskId)); > } > // The actual timeout for graceful task/connector stop is applied in worker's > // stopAndAwaitTask/stopAndAwaitConnector methods. > startAndStop(callables); > log.info("Finished stopping tasks in preparation for rebalance"); {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-13109) WorkerSourceTask is not enforcing the errors.retry.timeout and errors.retry.delay.max.ms parameters in case of a RetriableException during task.poll()
[ https://issues.apache.org/jira/browse/KAFKA-13109?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-13109: - Assignee: Sagar Rao > WorkerSourceTask is not enforcing the errors.retry.timeout and > errors.retry.delay.max.ms parameters in case of a RetriableException during > task.poll() > -- > > Key: KAFKA-13109 > URL: https://issues.apache.org/jira/browse/KAFKA-13109 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.8.0 >Reporter: Damien Gasparina >Assignee: Sagar Rao >Priority: Major > > It seems that the {{errors.retry.timeout}} timeout is not enforced if > {{RetriableException}} is thrown in the {{poll()}} of a SourceTask. > Looking at Kafka Connect source code: > * If a task throws a {{RetriableException}} during a {{poll()}}, the connect > runtime catch it and returns null: > [https://github.com/apache/kafka/blob/2.8.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L273-L277] > * Then, {{toSend}} is set to null, and the runtime continues the loop and > re-execute the next iteration of poll without any delay > [https://github.com/apache/kafka/blob/2.8.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L240-L246] > > This implies that, if the {{poll()}} is throwing a {{RetriableException}}: > * {{errors.retry.timeout}} is ignored and the task will retry indefinitely > * there would be no delay between each retry, {{errors.retry.delay.max.ms}} > is ignored, causing potential high resource utilization and log flooding > > My understanding of > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect] > is that {{errors.retry.timeout}} and {{errors.retry.delay.max.ms}} should > have been respected in case of a {{RetriableException}} during a Source Task > {{poll()}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-13156) KafkaStatusBackingStore making incorrect assumption about order of task and connector delete events
[ https://issues.apache.org/jira/browse/KAFKA-13156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17722367#comment-17722367 ] Sagar Rao commented on KAFKA-13156: --- hey [~ChrisEgerton] , I assigned this to myself ( I hope that's ok). I see it has been open for a while. I can send out a PR for this. > KafkaStatusBackingStore making incorrect assumption about order of task and > connector delete events > --- > > Key: KAFKA-13156 > URL: https://issues.apache.org/jira/browse/KAFKA-13156 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Nigel Liang >Assignee: Sagar Rao >Priority: Minor > > When a connector remove message is read from the status topic, > `KafkaStatusBackingStore` removes both the connector and associated tasks > from its connector and tasks cache. However, due to potentially partitioned > nature of the status topic, there is no guarantee about the order of delivery > for the connector removal message and task status messages for the same > connector. If the connector is rapidly removed and recreated, the status > backing store may see the task running events before the connector removal, > in which case the running tasks would be removed from the tasks cache. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-13156) KafkaStatusBackingStore making incorrect assumption about order of task and connector delete events
[ https://issues.apache.org/jira/browse/KAFKA-13156?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-13156: - Assignee: Sagar Rao (was: Chris Egerton) > KafkaStatusBackingStore making incorrect assumption about order of task and > connector delete events > --- > > Key: KAFKA-13156 > URL: https://issues.apache.org/jira/browse/KAFKA-13156 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Nigel Liang >Assignee: Sagar Rao >Priority: Minor > > When a connector remove message is read from the status topic, > `KafkaStatusBackingStore` removes both the connector and associated tasks > from its connector and tasks cache. However, due to potentially partitioned > nature of the status topic, there is no guarantee about the order of delivery > for the connector removal message and task status messages for the same > connector. If the connector is rapidly removed and recreated, the status > backing store may see the task running events before the connector removal, > in which case the running tasks would be removed from the tasks cache. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14956) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted
[ https://issues.apache.org/jira/browse/KAFKA-14956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao updated KAFKA-14956: -- Labels: flaky-test (was: ) > Flaky test > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted > -- > > Key: KAFKA-14956 > URL: https://issues.apache.org/jira/browse/KAFKA-14956 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Sagar Rao >Assignee: Yash Mayya >Priority: Major > Labels: flaky-test > > ``` > h4. Error > org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. > Sink connector consumer group offsets should catch up to the topic end > offsets ==> expected: but was: > h4. Stacktrace > org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. > Sink connector consumer group offsets should catch up to the topic end > offsets ==> expected: but was: > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211) > at > app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337) > at > app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:291) > at > app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:150) > at > app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted(OffsetsApiIntegrationTest.java:131) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568) > at > app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52) > at >
[jira] [Updated] (KAFKA-14956) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted
[ https://issues.apache.org/jira/browse/KAFKA-14956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao updated KAFKA-14956: -- Component/s: KafkaConnect > Flaky test > org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted > -- > > Key: KAFKA-14956 > URL: https://issues.apache.org/jira/browse/KAFKA-14956 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Sagar Rao >Assignee: Yash Mayya >Priority: Major > > ``` > h4. Error > org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. > Sink connector consumer group offsets should catch up to the topic end > offsets ==> expected: but was: > h4. Stacktrace > org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. > Sink connector consumer group offsets should catch up to the topic end > offsets ==> expected: but was: > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) > at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211) > at > app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337) > at > app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:291) > at > app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:150) > at > app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted(OffsetsApiIntegrationTest.java:131) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568) > at > app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at > app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) > at > app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) > at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) > at app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) > at app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) > at app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) > at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) > at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52) > at >
[jira] [Updated] (KAFKA-14971) Flaky Test org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs
[ https://issues.apache.org/jira/browse/KAFKA-14971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao updated KAFKA-14971: -- Labels: flaky-test mirror-maker (was: ) > Flaky Test > org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs > -- > > Key: KAFKA-14971 > URL: https://issues.apache.org/jira/browse/KAFKA-14971 > Project: Kafka > Issue Type: Bug >Reporter: Sagar Rao >Priority: Major > Labels: flaky-test, mirror-maker > > The test testSyncTopicConfigs in > `org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs` > seems to be flaky. Found here : > [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13594/6/tests] > Ran on local against the [same PR > |https://github.com/apache/kafka/pull/13594]and it passed. > > > {code:java} > org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, > because it's explicitly defined on the target topic! ==> expected: <2000> but > was: <8640> > Stacktrace > org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, > because it's explicitly defined on the target topic! ==> expected: <2000> but > was: <8640> > at > app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) > at > app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) > at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) > at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) > at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1153) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.lambda$testSyncTopicConfigs$8(MirrorConnectorsIntegrationBaseTest.java:758) > at > app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:325) > at > app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:373) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:322) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:306) > at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:296) > at > app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testSyncTopicConfigs(MirrorConnectorsIntegrationBaseTest.java:752) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at > java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) > at > java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568) > at > app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727) > at > app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) > at > app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) > at > app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) > at > app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) > at > app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217) > at >
[jira] [Created] (KAFKA-14997) JmxToolTest failing with initializationError
Sagar Rao created KAFKA-14997: - Summary: JmxToolTest failing with initializationError Key: KAFKA-14997 URL: https://issues.apache.org/jira/browse/KAFKA-14997 Project: Kafka Issue Type: Bug Reporter: Sagar Rao Assignee: Federico Valeri Noticed that JmxToolTest fails with ``` h4. Error java.io.IOException: Cannot bind to URL [rmi://:44743/jmxrmi]: javax.naming.ServiceUnavailableException [Root exception is java.rmi.ConnectException: Connection refused to host: 40.117.157.99; nested exception is: java.net.ConnectException: Connection timed out (Connection timed out)] h4. Stacktrace java.io.IOException: Cannot bind to URL [rmi://:44743/jmxrmi]: javax.naming.ServiceUnavailableException [Root exception is java.rmi.ConnectException: Connection refused to host: 40.117.157.99; nested exception is: java.net.ConnectException: Connection timed out (Connection timed out)] at javax.management.remote.rmi.RMIConnectorServer.newIOException(RMIConnectorServer.java:827) at javax.management.remote.rmi.RMIConnectorServer.start(RMIConnectorServer.java:432) at org.apache.kafka.tools.JmxToolTest.startJmxAgent(JmxToolTest.java:337) at org.apache.kafka.tools.JmxToolTest.beforeAll(JmxToolTest.java:55) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptLifecycleMethod(TimeoutExtension.java:128) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptBeforeAllMethod(TimeoutExtension.java:70) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeBeforeAllMethods$13(ClassBasedTestDescriptor.java:411) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeBeforeAllMethods(ClassBasedTestDescriptor.java:409) at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:215) at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:84) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:148) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) at java.util.ArrayList.forEach(ArrayList.java:1259) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) at
[jira] [Updated] (KAFKA-14971) Flaky Test org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs
[ https://issues.apache.org/jira/browse/KAFKA-14971?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao updated KAFKA-14971: -- Description: The test testSyncTopicConfigs in `org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs` seems to be flaky. Found here : [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13594/6/tests] Ran on local against the [same PR |https://github.com/apache/kafka/pull/13594]and it passed. {code:java} org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, because it's explicitly defined on the target topic! ==> expected: <2000> but was: <8640> Stacktrace org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, because it's explicitly defined on the target topic! ==> expected: <2000> but was: <8640> at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1153) at app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.lambda$testSyncTopicConfigs$8(MirrorConnectorsIntegrationBaseTest.java:758) at app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:325) at app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:373) at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:322) at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:306) at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:296) at app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testSyncTopicConfigs(MirrorConnectorsIntegrationBaseTest.java:752) at java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568) at app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727) at app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) at app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) at app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) at app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) at app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) at app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) at app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) at app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217) at app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213) at app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138) at app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68) at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151) at app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at
[jira] [Created] (KAFKA-14971) Flaky Test org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs
Sagar Rao created KAFKA-14971: - Summary: Flaky Test org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs Key: KAFKA-14971 URL: https://issues.apache.org/jira/browse/KAFKA-14971 Project: Kafka Issue Type: Bug Reporter: Sagar Rao The test testSyncTopicConfigs in ` org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs` seems to be flaky. Found here : [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13594/6/tests] Ran on local against the [same PR |https://github.com/apache/kafka/pull/13594]and it has passed. ``` h4. Error org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, because it's explicitly defined on the target topic! ==> expected: <2000> but was: <8640> h4. Stacktrace org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, because it's explicitly defined on the target topic! ==> expected: <2000> but was: <8640> at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1153) at app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.lambda$testSyncTopicConfigs$8(MirrorConnectorsIntegrationBaseTest.java:758) at app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:325) at app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:373) at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:322) at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:306) at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:296) at app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testSyncTopicConfigs(MirrorConnectorsIntegrationBaseTest.java:752) at java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568) at app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727) at app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) at app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) at app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) at app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) at app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) at app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) at app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) at app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217) at app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213) at app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138) at app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68) at