[jira] [Created] (KAFKA-16604) Deprecate ConfigDef.ConfigKey constructor from public APIs
Sagar Rao created KAFKA-16604: - Summary: Deprecate ConfigDef.ConfigKey constructor from public APIs Key: KAFKA-16604 URL: https://issues.apache.org/jira/browse/KAFKA-16604 Project: Kafka Issue Type: Improvement Reporter: Sagar Rao Currently, one can create ConfigKey by either invoking the public constructor directly and passing it to a ConfigDef object or by invoking the a bunch of define methods. The 2 ways can get confusing at times. Moreover, it could lead to errors as was noticed in KAFKA-16592 We should ideally have only 1 way exposed to the users which IMO should be to create the objects only through the exposed define methods. This ticket is about marking the public constructor of ConfigKey as Deprecated first and then making it private eventually. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16592) ConfigKey constructor update can break clients using it
Sagar Rao created KAFKA-16592: - Summary: ConfigKey constructor update can break clients using it Key: KAFKA-16592 URL: https://issues.apache.org/jira/browse/KAFKA-16592 Project: Kafka Issue Type: Bug Reporter: Sagar Rao Assignee: Sagar Rao In [KAFKA-14957|https://issues.apache.org/jira/browse/KAFKA-14957], the constructor of ConfigDef.ConfigKey was updated to add a new argument called {*}alternativeString{*}. As part of the PR, new *define* methods were also added which makes sense. However, since the constructor of *ConfigDef.ConfigKey* itself can be used directly by other clients which import the dependency, this can break all clients who were using the older constructor w/o the *alternativeString* argument. I bumped into this when I was testing the[kafka-connect-redis|[https://github.com/jcustenborder/kafka-connect-redis/tree/master]] connector. It starts up correctly against the official 3.7 release, but fails with the following error when run against a 3.8 snapshot {code:java} Caused by: java.lang.NoSuchMethodError: org.apache.kafka.common.config.ConfigDef$ConfigKey.(Ljava/lang/String;Lorg/apache/kafka/common/config/ConfigDef$Type;Ljava/lang/Object;Lorg/apache/kafka/common/config/ConfigDef$Validator;Lorg/apache/kafka/common/config/ConfigDef$Importance;Ljava/lang/String;Ljava/lang/String;ILorg/apache/kafka/common/config/ConfigDef$Width;Ljava/lang/String;Ljava/util/List;Lorg/apache/kafka/common/config/ConfigDef$Recommender;Z)V at com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder.build(ConfigKeyBuilder.java:62) at com.github.jcustenborder.kafka.connect.redis.RedisConnectorConfig.config(RedisConnectorConfig.java:133) at com.github.jcustenborder.kafka.connect.redis.RedisSinkConnectorConfig.config(RedisSinkConnectorConfig.java:46) at com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector.config(RedisSinkConnector.java:73) at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:538) at org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$3(AbstractHerder.java:412) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more {code} The reason for that is that the connector uses another library called connect-utils which invokes the old constructor [directly|https://github.com/jcustenborder/connect-utils/blob/master/connect-utils/src/main/java/com/github/jcustenborder/kafka/connect/utils/config/ConfigKeyBuilder.java#L62] It is not expected for connector invocations to fail across versions so this would cause confusion. We could argue that why is the constructor being invoked directly instead of using the *define* method, but there might be other clients doing the same. We should add the old constructor back which calls the new one by setting the *alternativeString* to null. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16197) Connect Worker poll timeout prints Consumer poll timeout specific warnings.
Sagar Rao created KAFKA-16197: - Summary: Connect Worker poll timeout prints Consumer poll timeout specific warnings. Key: KAFKA-16197 URL: https://issues.apache.org/jira/browse/KAFKA-16197 Project: Kafka Issue Type: Bug Reporter: Sagar Rao Assignee: Sagar Rao When a worker's poll timeout expires in Connect, the log lines that we see are: {noformat} consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. {noformat} and the reason for leaving the group is {noformat} Member XX sending LeaveGroup request to coordinator XX due to consumer poll timeout has expired. {noformat} which is specific to Consumers and not to Connect workers. The log line above in specially misleading because the config `max.poll.interval.ms` is not configurable for a Connect worker and could make someone believe that the logs are being written for Sink Connectors and not for Connect worker. Ideally, we should print something specific to Connect. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16056) Worker poll timeout expiry can lead to Duplicate task assignments.
Sagar Rao created KAFKA-16056: - Summary: Worker poll timeout expiry can lead to Duplicate task assignments. Key: KAFKA-16056 URL: https://issues.apache.org/jira/browse/KAFKA-16056 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Sagar Rao Assignee: Sagar Rao When a poll timeout expiry happens for a worker, it triggers a rebalance because it leaves the group pro-actively. Under normal scenarios, this leaving the group would trigger a scheduled rebalance delay. However, one thing to note is that, the worker which left the group temporarily, doesn't give up it's assignments and whatever tasks were running on it would remain as is. When the scheduled rebalance delay elapses, it would just get back it's assignments but given that there won't be any revocations, it should all work out fine. But there is an edge case here. Let's assume that a scheduled rebalance delay was already active on a group and just before a follow up rebalance due to scheduled rebalance elapsing, one of the worker's poll timeout expires. At this point, a rebalance is imminent and the leader would track the assignments of the transiently departed worker as lost [here|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L255] . When [handleLostAssignments|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L441] gets triggered, because the scheduledRebalance delay isn't reset yet and if [this|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L473] condition passes, the leader would assume that it needs to reassign all the lost assignments which it will. But because, the worker for which the poll timeout expired, doesn't rescind it's assignments we would end up noticing duplicate assignments- one set on the original worker which was already running the tasks and connectors and another set on the remaining group of workers which got the redistributed work. This could lead to task failures if connector has been written in a way which expects no duplicate tasks running across a cluster. Also, this edge case can be encountered more frequently if the `rebalance.timeout.ms` config is set to a lower value. One of the approaches could be to do something similar to https://issues.apache.org/jira/browse/KAFKA-9184 where upon coordinator discovery failure, the worker gives up all it's assignments and joins with an empty assignment. We could do something similar in this case as well. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15296) Allow committing offsets for Dropped records via SMTs
Sagar Rao created KAFKA-15296: - Summary: Allow committing offsets for Dropped records via SMTs Key: KAFKA-15296 URL: https://issues.apache.org/jira/browse/KAFKA-15296 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Sagar Rao Assignee: Sagar Rao Currently the connect Runtime doesn't commit the offsets of records which have been dropped due to SMT. This can lead to issues if the dropped record's partition reflects a source partition and the connector depends upon the committed offsets to make progress. In such cases, the connector might just stall. We should enable committing offsets for dropped records as well. Note that today if a record is dropped because exactly-once support is enabled and the connector chose to abort the batch containing the record, then its offset is still committed. So there already exists a discrepancy in the way the runtime treats these dropped records. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-15005) Status of KafkaConnect task not correct
[ https://issues.apache.org/jira/browse/KAFKA-15005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao resolved KAFKA-15005. --- Resolution: Duplicate > Status of KafkaConnect task not correct > --- > > Key: KAFKA-15005 > URL: https://issues.apache.org/jira/browse/KAFKA-15005 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.5.1, 3.0.0, 3.3.2 >Reporter: Yu Wang >Priority: Major > > Our MM2 is running version 2.5.1. > After a rebalance of our MM2 source tasks, we found there were several tasks > always in *UNASSIGNED* status, even the real tasks already started. > So we dump the payload of the status topic of Kafka Connect, and found the > last two status change is status *RUNNING* followed by status > {*}UNASSIGNED{*}. > {code:java} > LogAppendTime: 1684167885961 keysize: 64 valuesize: 95 sequence: -1 > headerKeys: [] key: task-7 payload: > {"state":"RUNNING","trace":null,"worker_id":"worker-1","generation":437643} > LogAppendTime: 1684167903456 keysize: 64 valuesize: 98 sequence: -1 > headerKeys: [] key: task-7 payload: > {"state":"UNASSIGNED","trace":null,"worker_id":"worker-2","generation":437643} > {code} > But usually, the RUNNING status should be appended after the UNASSIGNED, > because the worker coordinator will revoked the tasks before start new tasks. > Then we checked the log of our MM2 worker. And found that, during that time, > there was a task that revoked on worker-2 and started on worker-1. > > Worker-1 > {code:java} > [2023-05-15 09:24:45,950] INFO [Worker clientId=connect-1, > groupId=__group] Starting task task-7 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > [2023-05-15 09:24:45,951] INFO Creating task task-7 > (org.apache.kafka.connect.runtime.Worker) {code} > Worker-2 > {code:java} > [2023-05-15 09:24:40,922] INFO Stopping task task-7 > (org.apache.kafka.connect.runtime.Worker) {code} > > So I think the incorrect status was caused by the revoked task finished later > than the new started task, which made the UNASSIGNED status append to that > status topic after the RUNNING status. > > After reading the code of DistributeHerder, I found that the task revoking is > running in a thread pool, the revoke operation just return after submit all > the callables. So I think even in the same worker, there is not a guarantee > that the revoke operation will always finish before the new tasks start. > {code:java} > for (final ConnectorTaskId taskId : tasks) { > callables.add(getTaskStoppingCallable(taskId)); > } > // The actual timeout for graceful task/connector stop is applied in worker's > // stopAndAwaitTask/stopAndAwaitConnector methods. > startAndStop(callables); > log.info("Finished stopping tasks in preparation for rebalance"); {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-12283) Flaky Test RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining
[ https://issues.apache.org/jira/browse/KAFKA-12283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao resolved KAFKA-12283. --- Resolution: Fixed > Flaky Test > RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining > > > Key: KAFKA-12283 > URL: https://issues.apache.org/jira/browse/KAFKA-12283 > Project: Kafka > Issue Type: Test > Components: KafkaConnect, unit tests >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Critical > Labels: flaky-test > > https://github.com/apache/kafka/pull/1/checks?check_run_id=1820092809 > {quote} {{java.lang.AssertionError: Tasks are imbalanced: > localhost:36037=[seq-source13-0, seq-source13-1, seq-source13-2, > seq-source13-3, seq-source12-0, seq-source12-1, seq-source12-2, > seq-source12-3] > localhost:43563=[seq-source11-0, seq-source11-2, seq-source10-0, > seq-source10-2] > localhost:46539=[seq-source11-1, seq-source11-3, seq-source10-1, > seq-source10-3] > at org.junit.Assert.fail(Assert.java:89) > at org.junit.Assert.assertTrue(Assert.java:42) > at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.assertConnectorAndTasksAreUniqueAndBalanced(RebalanceSourceConnectorsIntegrationTest.java:362) > at > org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300) > at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290) > at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining(RebalanceSourceConnectorsIntegrationTest.java:313)}} > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-8391) Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector
[ https://issues.apache.org/jira/browse/KAFKA-8391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao resolved KAFKA-8391. -- Resolution: Fixed Fixed with https://github.com/apache/kafka/pull/12561 > Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector > --- > > Key: KAFKA-8391 > URL: https://issues.apache.org/jira/browse/KAFKA-8391 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Matthias J. Sax >Assignee: Sagar Rao >Priority: Critical > Labels: flaky-test > Attachments: 100-gradle-builds.tar > > > [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/4747/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testDeleteConnector/] > {quote}java.lang.AssertionError: Condition not met within timeout 3. > Connector tasks did not stop in time. at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:375) at > org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:352) at > org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector(RebalanceSourceConnectorsIntegrationTest.java:166){quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15229) Increase default value of task.shutdown.graceful.timeout.ms
Sagar Rao created KAFKA-15229: - Summary: Increase default value of task.shutdown.graceful.timeout.ms Key: KAFKA-15229 URL: https://issues.apache.org/jira/browse/KAFKA-15229 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Sagar Rao Assignee: Sagar Rao The Kafka Connect config [task.shutdown.graceful.timeout.ms. |https://kafka.apache.org/documentation/#connectconfigs_task.shutdown.graceful.timeout.ms]has a default value of 5s. As per it's definition: {noformat} Amount of time to wait for tasks to shutdown gracefully. This is the total amount of time, not per task. All task have shutdown triggered, then they are waited on sequentially.{noformat} it is the total timeout for all tasks to shutdown. Also, if multiple tasks are to be shutdown then, they are waited upon sequentially. Now the default value of this config is ok for smaller clusters with less number of tasks, on a larger cluster because the timeout can elapse we will see a lot of messages of the form ``` Graceful stop of task failed. ``` In case of failure in graceful stop of tasks, the tasks are cancelled which means that they won't send out a status update. Once that happens there won't be any `UNASSIGNED` status message posted for that task. Let's say the task stop was triggered by a worker going down. If the cluster is configured to use Incremental Cooperative Assignor, then the task wouldn't be reassigned until scheduled.rebalance.delay.max.ms interval elapses. So, for that amount of duration, the task would show up with status RUNNING whenever it's status is queried for. This can be confusing for the users. This problem can be exacerbated on cloud environments(like kubernetes pods) because there is a high chance that the running status would be associated with an older worker_id which doesn't even exist in the cluster anymore. While the net effect of all of this is not catastrophic i.e it won't lead to any processing delays or loss of data but the status of the task would be off. And if there are fast rebalances happening under Incremental Cooperative Assignor, then that duration could be high as well. So, the proposal is to increase the default value to a higher value. I am thinking we can set it to 60s because as far as I can see, it doesn't interfere with any other timeout that we have. I am tagging this as need-kip because I believe we will need one. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-12525) Inaccurate task status due to status record interleaving in fast rebalances in Connect
[ https://issues.apache.org/jira/browse/KAFKA-12525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao resolved KAFKA-12525. --- Resolution: Fixed > Inaccurate task status due to status record interleaving in fast rebalances > in Connect > -- > > Key: KAFKA-12525 > URL: https://issues.apache.org/jira/browse/KAFKA-12525 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.1, 2.4.1, 2.5.1, 2.7.0, 2.6.1 >Reporter: Konstantine Karantasis >Assignee: Sagar Rao >Priority: Major > > When a task is stopped in Connect it produces an {{UNASSIGNED}} status > record. > Equivalently, when a task is started or restarted in Connect it produces an > {{RUNNING}} status record in the Connect status topic. > At the same time rebalances are decoupled from task start and stop. These > operations happen in separate executor outside of the main worker thread that > performs the rebalance. > Normally, any delayed and stale {{UNASSIGNED}} status records are fenced by > the worker that is sending them. This worker is using the > {{StatusBackingStore#putSafe}} method that will reject any stale status > messages (called only for {{UNASSIGNED}} or {{FAILED}}) as long as the worker > is aware of the newer status record that declares a task as {{RUNNING}}. > In cases of fast consecutive rebalances where a task is revoked from one > worker and assigned to another one, it has been observed that there is a > small time window and thus a race condition during which a {{RUNNING}} status > record in the new generation is produced and is immediately followed by a > delayed {{UNASSIGNED}} status record belonging to the same or a previous > generation before the worker that sends this message reads the {{RUNNING}} > status record that corresponds to the latest generation. > A couple of options are available to remediate this race condition. > For example a worker that is has started a task can re-write the {{RUNNING}} > status message in the topic if it reads a stale {{UNASSIGNED}} message from a > previous generation (that should have been fenced). > Another option is to ignore stale {{UNASSIGNED}} message (messages from an > earlier generation than the one in which the task had {{RUNNING}} status). > Worth noting that when this race condition takes place, besides the > inaccurate status representation, the actual execution of the tasks remains > unaffected (e.g. the tasks are running correctly even though they appear as > {{UNASSIGNED}}). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15127) Allow offsets to be reset at the same time a connector is deleted.
Sagar Rao created KAFKA-15127: - Summary: Allow offsets to be reset at the same time a connector is deleted. Key: KAFKA-15127 URL: https://issues.apache.org/jira/browse/KAFKA-15127 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Sagar Rao This has been listed as [Future Work|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Automaticallydeleteoffsetswithconnectors] in KIP-875. Now that the delete offsets mechanism is also in place, we can take this up which will allow connector names to be reused after connector deletion. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14913) Migrate DistributedHerder Executor shutdown to use ThreadUtils#shutdownExecutorServiceQuietly
[ https://issues.apache.org/jira/browse/KAFKA-14913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao resolved KAFKA-14913. --- Resolution: Fixed > Migrate DistributedHerder Executor shutdown to use > ThreadUtils#shutdownExecutorServiceQuietly > - > > Key: KAFKA-14913 > URL: https://issues.apache.org/jira/browse/KAFKA-14913 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Minor > > Some context here: > https://github.com/apache/kafka/pull/13557#issuecomment-1509738740 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15041) Source Connector auto topic creation fails when topic is deleted and brokers don't support auto topic creation
Sagar Rao created KAFKA-15041: - Summary: Source Connector auto topic creation fails when topic is deleted and brokers don't support auto topic creation Key: KAFKA-15041 URL: https://issues.apache.org/jira/browse/KAFKA-15041 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Sagar Rao Assignee: Sagar Rao [KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics] allows the source connectors to create topics even when the broker doesn't allow to do so. It does so by checking for every record if a topic needs to be created [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L500.] To not always keep checking for topic presence via admin topics, it also maintains a cache of the topics that it has created and doesn't create those anymore. This helps to create topics when brokers don't support automatic topic creation. However, lets say the topic gets created initially and later on gets deleted while the connector is still running and the brokers don't support automatic topic creation. For such cases, the connector has cached the topic it has already created and wouldn't recreate it because the cache never updates and since the broker doesn't support topic creation, the logs would just be full of messages like ``` Error while fetching metadata with correlation id 3260 : \{connect-test=UNKNOWN_TOPIC_OR_PARTITION} ``` This can become a problem on enviroments where brokers don't allow topic creation. We need a way to refresh the topics cache for such cases. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14997) JmxToolTest failing with initializationError
Sagar Rao created KAFKA-14997: - Summary: JmxToolTest failing with initializationError Key: KAFKA-14997 URL: https://issues.apache.org/jira/browse/KAFKA-14997 Project: Kafka Issue Type: Bug Reporter: Sagar Rao Assignee: Federico Valeri Noticed that JmxToolTest fails with ``` h4. Error java.io.IOException: Cannot bind to URL [rmi://:44743/jmxrmi]: javax.naming.ServiceUnavailableException [Root exception is java.rmi.ConnectException: Connection refused to host: 40.117.157.99; nested exception is: java.net.ConnectException: Connection timed out (Connection timed out)] h4. Stacktrace java.io.IOException: Cannot bind to URL [rmi://:44743/jmxrmi]: javax.naming.ServiceUnavailableException [Root exception is java.rmi.ConnectException: Connection refused to host: 40.117.157.99; nested exception is: java.net.ConnectException: Connection timed out (Connection timed out)] at javax.management.remote.rmi.RMIConnectorServer.newIOException(RMIConnectorServer.java:827) at javax.management.remote.rmi.RMIConnectorServer.start(RMIConnectorServer.java:432) at org.apache.kafka.tools.JmxToolTest.startJmxAgent(JmxToolTest.java:337) at org.apache.kafka.tools.JmxToolTest.beforeAll(JmxToolTest.java:55) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727) at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptLifecycleMethod(TimeoutExtension.java:128) at org.junit.jupiter.engine.extension.TimeoutExtension.interceptBeforeAllMethod(TimeoutExtension.java:70) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeBeforeAllMethods$13(ClassBasedTestDescriptor.java:411) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeBeforeAllMethods(ClassBasedTestDescriptor.java:409) at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:215) at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:84) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:148) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139) at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138) at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95) at java.util.ArrayList.forEach(ArrayList.java:1259) at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41) at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155) at
[jira] [Created] (KAFKA-14971) Flaky Test org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs
Sagar Rao created KAFKA-14971: - Summary: Flaky Test org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs Key: KAFKA-14971 URL: https://issues.apache.org/jira/browse/KAFKA-14971 Project: Kafka Issue Type: Bug Reporter: Sagar Rao The test testSyncTopicConfigs in ` org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs` seems to be flaky. Found here : [https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13594/6/tests] Ran on local against the [same PR |https://github.com/apache/kafka/pull/13594]and it has passed. ``` h4. Error org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, because it's explicitly defined on the target topic! ==> expected: <2000> but was: <8640> h4. Stacktrace org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, because it's explicitly defined on the target topic! ==> expected: <2000> but was: <8640> at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197) at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182) at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1153) at app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.lambda$testSyncTopicConfigs$8(MirrorConnectorsIntegrationBaseTest.java:758) at app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:325) at app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:373) at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:322) at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:306) at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:296) at app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testSyncTopicConfigs(MirrorConnectorsIntegrationBaseTest.java:752) at java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568) at app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727) at app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60) at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131) at app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156) at app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147) at app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86) at app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103) at app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93) at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106) at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64) at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45) at app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37) at app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92) at app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86) at app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217) at app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) at app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213) at app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138) at app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68) at
[jira] [Created] (KAFKA-14956) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted
Sagar Rao created KAFKA-14956: - Summary: Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted Key: KAFKA-14956 URL: https://issues.apache.org/jira/browse/KAFKA-14956 Project: Kafka Issue Type: Bug Reporter: Sagar Rao ``` h4. Error org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. Sink connector consumer group offsets should catch up to the topic end offsets ==> expected: but was: h4. Stacktrace org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. Sink connector consumer group offsets should catch up to the topic end offsets ==> expected: but was: at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211) at app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337) at app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385) at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334) at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318) at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:291) at app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:150) at app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted(OffsetsApiIntegrationTest.java:131) at java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568) at app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40) at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52) at java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) at java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at
[jira] [Created] (KAFKA-14938) Flaky test org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest#testConnectorBoundary
Sagar Rao created KAFKA-14938: - Summary: Flaky test org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest#testConnectorBoundary Key: KAFKA-14938 URL: https://issues.apache.org/jira/browse/KAFKA-14938 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Sagar Rao Test seems to be failing with ``` ava.lang.AssertionError: Not enough records produced by source connector. Expected at least: 100 + but got 72 h4. Stacktrace java.lang.AssertionError: Not enough records produced by source connector. Expected at least: 100 + but got 72 at org.junit.Assert.fail(Assert.java:89) at org.junit.Assert.assertTrue(Assert.java:42) at org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary(ExactlyOnceSourceIntegrationTest.java:421) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40) at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94) at com.sun.proxy.$Proxy2.processTestClass(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176) at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129) at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100) at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60) at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56) at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:113) at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:65) at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69) ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14913) Migrate DistributedHerder Executor shutdown to use ThreadUtils#shutdownExecutorServiceQuietly
Sagar Rao created KAFKA-14913: - Summary: Migrate DistributedHerder Executor shutdown to use ThreadUtils#shutdownExecutorServiceQuietly Key: KAFKA-14913 URL: https://issues.apache.org/jira/browse/KAFKA-14913 Project: Kafka Issue Type: Improvement Reporter: Sagar Rao -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14586) Move StreamsResetter to tools
[ https://issues.apache.org/jira/browse/KAFKA-14586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao resolved KAFKA-14586. --- Resolution: Fixed > Move StreamsResetter to tools > - > > Key: KAFKA-14586 > URL: https://issues.apache.org/jira/browse/KAFKA-14586 > Project: Kafka > Issue Type: Sub-task >Reporter: Mickael Maison >Assignee: Sagar Rao >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14851) Move StreamResetterTest to tools module
Sagar Rao created KAFKA-14851: - Summary: Move StreamResetterTest to tools module Key: KAFKA-14851 URL: https://issues.apache.org/jira/browse/KAFKA-14851 Project: Kafka Issue Type: Sub-task Reporter: Sagar Rao This came up as a suggestion here: [https://github.com/apache/kafka/pull/13127#issuecomment-1433155607] . -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14734) Use CommandDefaultOptions in StreamsResetter
Sagar Rao created KAFKA-14734: - Summary: Use CommandDefaultOptions in StreamsResetter Key: KAFKA-14734 URL: https://issues.apache.org/jira/browse/KAFKA-14734 Project: Kafka Issue Type: Sub-task Reporter: Sagar Rao This came up as a suggestion here: [https://github.com/apache/kafka/pull/13127#issuecomment-1433155607] . -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14647) Move TopicFilter shared class
Sagar Rao created KAFKA-14647: - Summary: Move TopicFilter shared class Key: KAFKA-14647 URL: https://issues.apache.org/jira/browse/KAFKA-14647 Project: Kafka Issue Type: Sub-task Reporter: Sagar Rao Assignee: Federico Valeri -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-10652) Raft leader should flush accumulated writes after a min size is reached
[ https://issues.apache.org/jira/browse/KAFKA-10652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao resolved KAFKA-10652. --- Resolution: Won't Fix Not sure this is needed anymore. > Raft leader should flush accumulated writes after a min size is reached > --- > > Key: KAFKA-10652 > URL: https://issues.apache.org/jira/browse/KAFKA-10652 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Assignee: Sagar Rao >Priority: Major > > In KAFKA-10601, we implemented linger semantics similar to the producer to > let the leader accumulate a batch of writes before fsyncing them to disk. > Currently the fsync is only based on the linger time, but it would be helpful > to make it size-based as well. In other words, if we accumulate a > configurable N bytes, then we should not wait for linger expiration and > should just fsync immediately. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13296) Verify old assignment within StreamsPartitionAssignor
[ https://issues.apache.org/jira/browse/KAFKA-13296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao resolved KAFKA-13296. --- Resolution: Fixed > Verify old assignment within StreamsPartitionAssignor > - > > Key: KAFKA-13296 > URL: https://issues.apache.org/jira/browse/KAFKA-13296 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Assignee: Sagar Rao >Priority: Major > > `StreamsPartitionAssignor` is responsible to assign partitions and tasks to > all StreamsThreads within an application. > While it ensures to not assign a single partition/task to two threads, there > is limited verification about it. In particular, we had one incident for with > a zombie thread/consumer did not cleanup its own internal state correctly due > to KAFKA-12983. This unclean zombie-state implied that the _old assignment_ > reported to `StreamsPartitionAssignor` contained a single partition for two > consumers. As a result, both threads/consumers later revoked the same > partition and the zombie-thread could commit it's unclean work (even if it > should have been fenced), leading to duplicate output under EOS_v2. > We should consider to add a check to `StreamsPartitionAssignor` if the _old > assignment_ is valid, ie, no partition should be missing and no partition > should be assigned to two consumers. For this case, we should log the invalid > _old assignment_ and send an error code back to all consumer that indicates > that they should shut down "unclean" (ie, without and flushing and no > committing any offsets or transactions). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14461) StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores logic to check for active partitions seems brittle.
Sagar Rao created KAFKA-14461: - Summary: StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores logic to check for active partitions seems brittle. Key: KAFKA-14461 URL: https://issues.apache.org/jira/browse/KAFKA-14461 Project: Kafka Issue Type: Bug Reporter: Sagar Rao Assignee: Sagar Rao Newly added test KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions as part of KIP-837 passes when run individually but fails when is part of IT class and hence is marked as Ignored. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14454) KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions passes when run individua
Sagar Rao created KAFKA-14454: - Summary: KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions passes when run individually but not when is run as part of the IT Key: KAFKA-14454 URL: https://issues.apache.org/jira/browse/KAFKA-14454 Project: Kafka Issue Type: Bug Reporter: Sagar Rao Assignee: Sagar Rao Newly added test KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions as part of KIP-837 passes when run individually but fails when is part of IT class and hence is marked as Ignored. As part of this ticket, we can also look to move to Junit5 annotations for this class since it relies on Junit4 ones. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14401) Connector/Tasks reading offsets can get stuck if underneath WorkThread dies
Sagar Rao created KAFKA-14401: - Summary: Connector/Tasks reading offsets can get stuck if underneath WorkThread dies Key: KAFKA-14401 URL: https://issues.apache.org/jira/browse/KAFKA-14401 Project: Kafka Issue Type: Bug Reporter: Sagar Rao When a connector or task tries to read the offsets from the offsets topic, it issues `OffsetStorageImpl#offsets` method. This method gets a Future from the underneath KafkaBackingStore. KafkaBackingStore invokes `KafkaBasedLog#readToEnd` method and passes the Callback. This method essentially adds the Callback to a Queue of callbacks that are being managed. Within KafkaBasedLog, there's a WorkThread which keeps polling over the callback queue and executes them and it does this in an infinite loop. However, there is an enclosing try/catch block around the while loop. If there's an exception thrown which is not caught by any of the other catch blocks, the control goes to the outermost catch block and the WorkThread is terminated. However, the connectors/tasks are not aware of this and they would keep submitting callbacks to KafkaBasedLog with nobody processing them. This can be seen in the thread dumps as well: ``` "task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s tid=0x7f8d9c037000 nid=0x5d00 waiting on condition [0x7f8dc08cd000] java.lang.Thread.State: WAITING (parking) at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method) - parking to wait for <0x00070345c9a8> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345) at java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232) at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98) at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101) at org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63) ``` We need a mechanism to restart the WorkThread if it dies. This could be done in the outermost catch block for example. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14007) Connect header converters are never closed
[ https://issues.apache.org/jira/browse/KAFKA-14007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao resolved KAFKA-14007. --- Resolution: Fixed > Connect header converters are never closed > -- > > Key: KAFKA-14007 > URL: https://issues.apache.org/jira/browse/KAFKA-14007 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Sagar Rao >Priority: Major > > The [HeaderConverter > interface|https://github.com/apache/kafka/blob/1e21201ea24389bdaccb8a462f3a53e356b58a58/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java#L27] > extends {{Closeable}}, but {{HeaderConverter::close}} is never actually > invoked anywhere. We can and should start invoking it, probably wrapped in > [Utils::closeQuietly|https://github.com/apache/kafka/blob/1e21201ea24389bdaccb8a462f3a53e356b58a58/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L999-L1010] > so that any invalid logic in that method for custom header converters that > has to date gone undetected will not cause new task failures. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-14012) passing a "method" into the `Utils.closeQuietly` method cause NPE
[ https://issues.apache.org/jira/browse/KAFKA-14012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao resolved KAFKA-14012. --- Resolution: Fixed > passing a "method" into the `Utils.closeQuietly` method cause NPE > - > > Key: KAFKA-14012 > URL: https://issues.apache.org/jira/browse/KAFKA-14012 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 3.2.0 >Reporter: Luke Chen >Assignee: Sagar Rao >Priority: Major > > Utils.closeQuietly method accepts `AutoCloseable` object, and close it. But > there are some places we passed "method" into Utils.closeQuietly, which > causes the object doesn't get closed as expected. > I found it appeared in: > - WorkerConnector > - AbstractWorkerSourceTask > - KafkaConfigBackingStore > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14040) Improve test coverage for max buffer bytes metrics
Sagar Rao created KAFKA-14040: - Summary: Improve test coverage for max buffer bytes metrics Key: KAFKA-14040 URL: https://issues.apache.org/jira/browse/KAFKA-14040 Project: Kafka Issue Type: Bug Components: streams Reporter: Sagar Rao Assignee: Sagar Rao In some EOS applications with relatively long restoration times we've noticed a series of ProducerFencedExceptions occurring during/immediately after restoration. The broker logs were able to confirm these were due to transactions timing out. In Streams, it turns out we automatically begin a new txn when calling {{send}} (if there isn’t already one in flight). A {{send}} occurs often outside a commit during active processing (eg writing to the changelog), leaving the txn open until the next commit. And if a StreamThread has been actively processing when a rebalance results in a new stateful task without revoking any existing tasks, the thread won’t actually commit this open txn before it goes back into the restoration phase while it builds up state for the new task. So the in-flight transaction is left open during restoration, during which the StreamThread only consumes from the changelog without committing, leaving it vulnerable to timing out when restoration times exceed the configured transaction.timeout.ms for the producer client. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-13970) TopicAdmin topic creation should be retried on TimeoutException
[ https://issues.apache.org/jira/browse/KAFKA-13970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao resolved KAFKA-13970. --- Resolution: Won't Fix > TopicAdmin topic creation should be retried on TimeoutException > --- > > Key: KAFKA-13970 > URL: https://issues.apache.org/jira/browse/KAFKA-13970 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Daniel Urban >Assignee: Sagar Rao >Priority: Major > > org.apache.kafka.connect.util.TopicAdmin#createTopicsWithRetry handles the > case when there aren't enough brokers in the cluster to create a topic with > the expected replication factor. This logic should also handle the case when > there are 0 brokers in the cluster, and should retry in that case. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Resolved] (KAFKA-13624) Add Metric for Store Cache Size
[ https://issues.apache.org/jira/browse/KAFKA-13624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao resolved KAFKA-13624. --- Resolution: Won't Fix Taken care as part of KIP-770 > Add Metric for Store Cache Size > --- > > Key: KAFKA-13624 > URL: https://issues.apache.org/jira/browse/KAFKA-13624 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > > KIP-770 introduced a new metric called `{*}input-buffer-bytes-total`{*} to > track the total amount of bytes accumulated by a task. While working through > it's PR, it was suggested to add a similar metric for > *cache-size-bytes-total* to track the cache size in bytes for a task. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore
[ https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao resolved KAFKA-9168. -- Resolution: Later This would be taken up once we have a more complete solution > Integrate JNI direct buffer support to RocksDBStore > --- > > Key: KAFKA-9168 > URL: https://issues.apache.org/jira/browse/KAFKA-9168 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > Labels: perfomance > > There has been a PR created on rocksdb Java client to support direct > ByteBuffers in Java. We can look at integrating it whenever it gets merged. > Link to PR: [https://github.com/facebook/rocksdb/pull/2283] -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Resolved] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"
[ https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao resolved KAFKA-13152. --- Resolution: Done > Replace "buffered.records.per.partition" with "input.buffer.max.bytes" > --- > > Key: KAFKA-13152 > URL: https://issues.apache.org/jira/browse/KAFKA-13152 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Sagar Rao >Priority: Major > Labels: needs-kip > > The current config "buffered.records.per.partition" controls how many records > in maximum to bookkeep, and hence it is exceed we would pause fetching from > this partition. However this config has two issues: > * It's a per-partition config, so the total memory consumed is dependent on > the dynamic number of partitions assigned. > * Record size could vary from case to case. > And hence it's hard to bound the memory usage for this buffering. We should > consider deprecating that config with a global, e.g. "input.buffer.max.bytes" > which controls how much bytes in total is allowed to be buffered. This is > doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13624) Add Metric for Store Cache Size
Sagar Rao created KAFKA-13624: - Summary: Add Metric for Store Cache Size Key: KAFKA-13624 URL: https://issues.apache.org/jira/browse/KAFKA-13624 Project: Kafka Issue Type: Improvement Components: streams Reporter: Sagar Rao Assignee: Sagar Rao The current config "buffered.records.per.partition" controls how many records in maximum to bookkeep, and hence it is exceed we would pause fetching from this partition. However this config has two issues: * It's a per-partition config, so the total memory consumed is dependent on the dynamic number of partitions assigned. * Record size could vary from case to case. And hence it's hard to bound the memory usage for this buffering. We should consider deprecating that config with a global, e.g. "input.buffer.max.bytes" which controls how much bytes in total is allowed to be buffered. This is doable since we buffer the raw records in . -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-12289) Add Tests for prefixScan in InMemoryKeyValueStore
Sagar Rao created KAFKA-12289: - Summary: Add Tests for prefixScan in InMemoryKeyValueStore Key: KAFKA-12289 URL: https://issues.apache.org/jira/browse/KAFKA-12289 Project: Kafka Issue Type: Improvement Components: streams, unit tests Reporter: Sagar Rao Assignee: Rohit Deshpande While reviewing, kIP-614, it was decided that tests for [CachingInMemoryKeyValueStoreTest.java|https://github.com/apache/kafka/pull/9508/files/899b79781d3412658293b918dce16709121accf1#diff-fdfe70d8fa0798642f0ed54785624aa9850d5d86afff2285acdf12f2775c3588] need to be streamlined to use mocked underlyingStore. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10789) Streamlining Tests in ChangeLoggingKeyValueBytesStoreTest
Sagar Rao created KAFKA-10789: - Summary: Streamlining Tests in ChangeLoggingKeyValueBytesStoreTest Key: KAFKA-10789 URL: https://issues.apache.org/jira/browse/KAFKA-10789 Project: Kafka Issue Type: Improvement Reporter: Sagar Rao While reviewing, kIP-614, it was decided that tests for [CachingInMemoryKeyValueStoreTest.java|https://github.com/apache/kafka/pull/9508/files/899b79781d3412658293b918dce16709121accf1#diff-fdfe70d8fa0798642f0ed54785624aa9850d5d86afff2285acdf12f2775c3588] need to be streamlined to use mocked underlyingStore. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10788) Streamlining Tests in CachingInMemoryKeyValueStoreTest
Sagar Rao created KAFKA-10788: - Summary: Streamlining Tests in CachingInMemoryKeyValueStoreTest Key: KAFKA-10788 URL: https://issues.apache.org/jira/browse/KAFKA-10788 Project: Kafka Issue Type: Improvement Reporter: Sagar Rao While reviewing, kIP-614, it was decided that tests for [CachingInMemoryKeyValueStoreTest.java|https://github.com/apache/kafka/pull/9508/files/899b79781d3412658293b918dce16709121accf1#diff-fdfe70d8fa0798642f0ed54785624aa9850d5d86afff2285acdf12f2775c3588] need to be streamlined to use mocked underlyingStore. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10767) Add Unit Test cases for missing methods in ThreadCacheTest
Sagar Rao created KAFKA-10767: - Summary: Add Unit Test cases for missing methods in ThreadCacheTest Key: KAFKA-10767 URL: https://issues.apache.org/jira/browse/KAFKA-10767 Project: Kafka Issue Type: Improvement Reporter: Sagar Rao Assignee: Sagar Rao During the code review for KIP-614, it was noticed that RocksDbRangeIterator does not have any unit test cases. Here is the github comment for referrence: [https://github.com/apache/kafka/pull/9508#discussion_r527612942] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10766) Add Unit Test cases for RocksDbRangeIterator
Sagar Rao created KAFKA-10766: - Summary: Add Unit Test cases for RocksDbRangeIterator Key: KAFKA-10766 URL: https://issues.apache.org/jira/browse/KAFKA-10766 Project: Kafka Issue Type: Improvement Reporter: Sagar Rao Assignee: Sagar Rao During the code review for KIP-614, it was noticed that RocksDbRangeIterator does not have any unit test cases. Here is the github comment for referrence: [https://github.com/apache/kafka/pull/9508#discussion_r527612942] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10648) Add Prefix Scan support to State Stores
Sagar Rao created KAFKA-10648: - Summary: Add Prefix Scan support to State Stores Key: KAFKA-10648 URL: https://issues.apache.org/jira/browse/KAFKA-10648 Project: Kafka Issue Type: Improvement Reporter: Sagar Rao Assignee: Sagar Rao This issue is related to the changes mentioned in: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores] which seeks to add prefix scan support to State stores. Currently, only RocksDB and InMemory key value stores are being supported. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore
Sagar Rao created KAFKA-9168: Summary: Integrate JNI direct buffer support to RocksDBStore Key: KAFKA-9168 URL: https://issues.apache.org/jira/browse/KAFKA-9168 Project: Kafka Issue Type: Task Reporter: Sagar Rao There has been a PR created on rocksdb Java client to support direct ByteBuffers in Java. We can look at integrating it whenever it gets merged. Link to PR: [https://github.com/facebook/rocksdb/pull/2283] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-4371) Sporadic ConnectException shuts down the whole connect process
[ https://issues.apache.org/jira/browse/KAFKA-4371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao resolved KAFKA-4371. -- Resolution: Not A Problem > Sporadic ConnectException shuts down the whole connect process > -- > > Key: KAFKA-4371 > URL: https://issues.apache.org/jira/browse/KAFKA-4371 > Project: Kafka > Issue Type: Bug >Reporter: Sagar Rao >Priority: Critical > > I had setup a 2 node distributed kafka-connect process. Everything went well > and I could see lot of data flowing into the relevant kafka topics. > After some time, JDBCUtils.getCurrentTimeOnDB threw a ConnectException with > the following stacktrace: > The last packet successfully received from the server was 792 milliseconds > ago. The last packet sent successfully to the server was 286 milliseconds > ago. (io.confluent.connect.jdbc.source.JdbcSourceTask:234) > [2016-11-02 12:42:06,116] ERROR Failed to get current time from DB using > query select CURRENT_TIMESTAMP; on database MySQL > (io.confluent.connect.jdbc.util.JdbcUtils:226) > com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link > failure > The last packet successfully received from the server was 1,855 milliseconds > ago. The last packet sent successfully to the server was 557 milliseconds > ago. >at sun.reflect.GeneratedConstructorAccessor51.newInstance(Unknown > Source) >at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >at com.mysql.jdbc.Util.handleNewInstance(Util.java:411) >at > com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1117) >at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3829) >at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2449) >at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2629) >at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2719) >at > com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2155) >at > com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1379) >at > com.mysql.jdbc.StatementImpl.createResultSetUsingServerFetch(StatementImpl.java:651) >at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1527) >at > io.confluent.connect.jdbc.util.JdbcUtils.getCurrentTimeOnDB(JdbcUtils.java:220) >at > io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:157) >at > io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:78) >at > io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:57) >at > io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:207) >at > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:155) >at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) >at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) >at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >at java.util.concurrent.FutureTask.run(FutureTask.java:266) >at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >at java.lang.Thread.run(Thread.java:745) > Caused by: java.net.SocketException: Broken pipe (Write failed) >at java.net.SocketOutputStream.socketWrite0(Native Method) >at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) >at java.net.SocketOutputStream.write(SocketOutputStream.java:153) >at > java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) >at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) >at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3810) >... 20 more > This was just a minor glitch to the connection as the ec2 isntances are able > to connect to the Mysql Aurora instances without any issues. > But, after this exception(which is there a number of times), none of the > connectors' tasks are executing. Beyond this, all I see in the logs is > [2016-11-02 16:17:41,983] ERROR Failed to run query for table > TimestampIncrementingTableQuerier{name='eng_match_series', query='null', > topicPrefix='ci-eng-', timestampColumn='modified', incrementingColumn='id'}: > com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: No > operations allowed after statement closed. >
[jira] [Commented] (KAFKA-4371) Sporadic ConnectException shuts down the whole connect process
[ https://issues.apache.org/jira/browse/KAFKA-4371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636091#comment-15636091 ] Sagar Rao commented on KAFKA-4371: -- [~ewencp] By shutting down I meant that the processing of data doesn't happen anymore. I should have been a bit more careful in writing that. Since this is a connect-jdbc issue, I will close this ticket. I have created a separate issue on connect-jdbc: https://github.com/confluentinc/kafka-connect-jdbc/issues/160 > Sporadic ConnectException shuts down the whole connect process > -- > > Key: KAFKA-4371 > URL: https://issues.apache.org/jira/browse/KAFKA-4371 > Project: Kafka > Issue Type: Bug >Reporter: Sagar Rao >Priority: Critical > > I had setup a 2 node distributed kafka-connect process. Everything went well > and I could see lot of data flowing into the relevant kafka topics. > After some time, JDBCUtils.getCurrentTimeOnDB threw a ConnectException with > the following stacktrace: > The last packet successfully received from the server was 792 milliseconds > ago. The last packet sent successfully to the server was 286 milliseconds > ago. (io.confluent.connect.jdbc.source.JdbcSourceTask:234) > [2016-11-02 12:42:06,116] ERROR Failed to get current time from DB using > query select CURRENT_TIMESTAMP; on database MySQL > (io.confluent.connect.jdbc.util.JdbcUtils:226) > com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link > failure > The last packet successfully received from the server was 1,855 milliseconds > ago. The last packet sent successfully to the server was 557 milliseconds > ago. >at sun.reflect.GeneratedConstructorAccessor51.newInstance(Unknown > Source) >at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >at com.mysql.jdbc.Util.handleNewInstance(Util.java:411) >at > com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1117) >at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3829) >at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2449) >at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2629) >at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2719) >at > com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2155) >at > com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1379) >at > com.mysql.jdbc.StatementImpl.createResultSetUsingServerFetch(StatementImpl.java:651) >at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1527) >at > io.confluent.connect.jdbc.util.JdbcUtils.getCurrentTimeOnDB(JdbcUtils.java:220) >at > io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:157) >at > io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:78) >at > io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:57) >at > io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:207) >at > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:155) >at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) >at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) >at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >at java.util.concurrent.FutureTask.run(FutureTask.java:266) >at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >at java.lang.Thread.run(Thread.java:745) > Caused by: java.net.SocketException: Broken pipe (Write failed) >at java.net.SocketOutputStream.socketWrite0(Native Method) >at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) >at java.net.SocketOutputStream.write(SocketOutputStream.java:153) >at > java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) >at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) >at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3810) >... 20 more > This was just a minor glitch to the connection as the ec2 isntances are able > to connect to the Mysql Aurora instances without any issues. > But, after this exception(which is there a number of times), none of the > connectors' tasks are executing. Beyond this, all I see in the logs is > [2016-11-02 16:17:41,983] ERROR Failed to run query for table
[jira] [Commented] (KAFKA-4371) Sporadic ConnectException shuts down the whole connect process
[ https://issues.apache.org/jira/browse/KAFKA-4371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15632717#comment-15632717 ] Sagar Rao commented on KAFKA-4371: -- Another thing to note is that we get this exception when I try to delete an existing connector. After that we get a CommunicationLinkException and then all the processes stop. Not sure how is this connected though. > Sporadic ConnectException shuts down the whole connect process > -- > > Key: KAFKA-4371 > URL: https://issues.apache.org/jira/browse/KAFKA-4371 > Project: Kafka > Issue Type: Bug >Reporter: Sagar Rao >Priority: Critical > > I had setup a 2 node distributed kafka-connect process. Everything went well > and I could see lot of data flowing into the relevant kafka topics. > After some time, JDBCUtils.getCurrentTimeOnDB threw a ConnectException with > the following stacktrace: > The last packet successfully received from the server was 792 milliseconds > ago. The last packet sent successfully to the server was 286 milliseconds > ago. (io.confluent.connect.jdbc.source.JdbcSourceTask:234) > [2016-11-02 12:42:06,116] ERROR Failed to get current time from DB using > query select CURRENT_TIMESTAMP; on database MySQL > (io.confluent.connect.jdbc.util.JdbcUtils:226) > com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link > failure > The last packet successfully received from the server was 1,855 milliseconds > ago. The last packet sent successfully to the server was 557 milliseconds > ago. >at sun.reflect.GeneratedConstructorAccessor51.newInstance(Unknown > Source) >at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >at com.mysql.jdbc.Util.handleNewInstance(Util.java:411) >at > com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1117) >at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3829) >at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2449) >at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2629) >at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2719) >at > com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2155) >at > com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1379) >at > com.mysql.jdbc.StatementImpl.createResultSetUsingServerFetch(StatementImpl.java:651) >at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1527) >at > io.confluent.connect.jdbc.util.JdbcUtils.getCurrentTimeOnDB(JdbcUtils.java:220) >at > io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:157) >at > io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:78) >at > io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:57) >at > io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:207) >at > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:155) >at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) >at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) >at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >at java.util.concurrent.FutureTask.run(FutureTask.java:266) >at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >at java.lang.Thread.run(Thread.java:745) > Caused by: java.net.SocketException: Broken pipe (Write failed) >at java.net.SocketOutputStream.socketWrite0(Native Method) >at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) >at java.net.SocketOutputStream.write(SocketOutputStream.java:153) >at > java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) >at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) >at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3810) >... 20 more > This was just a minor glitch to the connection as the ec2 isntances are able > to connect to the Mysql Aurora instances without any issues. > But, after this exception(which is there a number of times), none of the > connectors' tasks are executing. Beyond this, all I see in the logs is > [2016-11-02 16:17:41,983] ERROR Failed to run query for table > TimestampIncrementingTableQuerier{name='eng_match_series', query='null', > topicPrefix='ci-eng-',
[jira] [Created] (KAFKA-4371) Sporadic ConnectException shuts down the whole connect process
Sagar Rao created KAFKA-4371: Summary: Sporadic ConnectException shuts down the whole connect process Key: KAFKA-4371 URL: https://issues.apache.org/jira/browse/KAFKA-4371 Project: Kafka Issue Type: Bug Reporter: Sagar Rao Priority: Critical I had setup a 2 node distributed kafka-connect process. Everything went well and I could see lot of data flowing into the relevant kafka topics. After some time, JDBCUtils.getCurrentTimeOnDB threw a ConnectException with the following stacktrace: The last packet successfully received from the server was 792 milliseconds ago. The last packet sent successfully to the server was 286 milliseconds ago. (io.confluent.connect.jdbc.source.JdbcSourceTask:234) [2016-11-02 12:42:06,116] ERROR Failed to get current time from DB using query select CURRENT_TIMESTAMP; on database MySQL (io.confluent.connect.jdbc.util.JdbcUtils:226) com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure The last packet successfully received from the server was 1,855 milliseconds ago. The last packet sent successfully to the server was 557 milliseconds ago. at sun.reflect.GeneratedConstructorAccessor51.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at com.mysql.jdbc.Util.handleNewInstance(Util.java:411) at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1117) at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3829) at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2449) at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2629) at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2719) at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2155) at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1379) at com.mysql.jdbc.StatementImpl.createResultSetUsingServerFetch(StatementImpl.java:651) at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1527) at io.confluent.connect.jdbc.util.JdbcUtils.getCurrentTimeOnDB(JdbcUtils.java:220) at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:157) at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:78) at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:57) at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:207) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:155) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.net.SocketException: Broken pipe (Write failed) at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109) at java.net.SocketOutputStream.write(SocketOutputStream.java:153) at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3810) ... 20 more This was just a minor glitch to the connection as the ec2 isntances are able to connect to the Mysql Aurora instances without any issues. But, after this exception(which is there a number of times), none of the connectors' tasks are executing. Beyond this, all I see in the logs is [2016-11-02 16:17:41,983] ERROR Failed to run query for table TimestampIncrementingTableQuerier{name='eng_match_series', query='null', topicPrefix='ci-eng-', timestampColumn='modified', incrementingColumn='id'}: com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: No operations allowed after statement closed. (io.confluent.connect.jdbc.source.JdbcSourceTask:234) Is this expected behaviour? I restarted the connector using REST apis but that didn't help. How do we handle such scenarios? Eventually I had to delete the connector and restart. The kafka version I am using is 0.10.0.1-cp1 as there were some custom changes we needed to make at the Connect level. -- This message was sent by Atlassian JIRA
[jira] [Created] (KAFKA-4342) Kafka-connect- support tinyint values
Sagar Rao created KAFKA-4342: Summary: Kafka-connect- support tinyint values Key: KAFKA-4342 URL: https://issues.apache.org/jira/browse/KAFKA-4342 Project: Kafka Issue Type: Bug Reporter: Sagar Rao We have been using Kafka-connect-jdbc actively for one of our projects and one of the issues that we have noticed is the way it handles the tinyint values. Our database is on mysql and mysql allows both signed and unsigned values to be stored. So, it can have values going upto 255 but when kafka-connect sees values beyond 128, it fails. Reason being, in the ConnectSchema class, the INT8 maps to a Byte which is a signed value. If we look at the jdbc docs then this is what they say about handling tinyint values: https://docs.oracle.com/javase/6/docs/technotes/guides/jdbc/getstart/mapping.html 8.3.4 TINYINT The JDBC type TINYINT represents an 8-bit integer value between 0 and 255 that may be signed or unsigned. The corresponding SQL type, TINYINT, is currently supported by only a subset of the major databases. Portable code may therefore prefer to use the JDBC SMALLINT type, which is widely supported. The recommended Java mapping for the JDBC TINYINT type is as either a Java byte or a Java short. The 8-bit Java byte type represents a signed value from -128 to 127, so it may not always be appropriate for larger TINYINT values, whereas the 16-bit Java short will always be able to hold all TINYINT values. I had submitted a PR for this last week. But it failed in the jenkins build for unrelated test case. So, if someone can take a look at this or suggest something then it would be great: https://github.com/apache/kafka/pull/2044 -- This message was sent by Atlassian JIRA (v6.3.4#6332)