[jira] [Created] (KAFKA-16604) Deprecate ConfigDef.ConfigKey constructor from public APIs

2024-04-23 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-16604:
-

 Summary: Deprecate ConfigDef.ConfigKey constructor from public APIs
 Key: KAFKA-16604
 URL: https://issues.apache.org/jira/browse/KAFKA-16604
 Project: Kafka
  Issue Type: Improvement
Reporter: Sagar Rao


Currently, one can create ConfigKey by either invoking the public constructor 
directly and passing it to a ConfigDef object or by invoking the a bunch of 
define methods. The 2 ways can get confusing at times. Moreover, it could lead 
to errors as was noticed in KAFKA-16592

We should ideally have only 1 way exposed to the users which IMO should be to 
create the objects only through the exposed define methods. This ticket is 
about marking the public constructor of ConfigKey as Deprecated first and then 
making it private eventually.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16592) ConfigKey constructor update can break clients using it

2024-04-20 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-16592:
-

 Summary: ConfigKey constructor update can break clients using it
 Key: KAFKA-16592
 URL: https://issues.apache.org/jira/browse/KAFKA-16592
 Project: Kafka
  Issue Type: Bug
Reporter: Sagar Rao
Assignee: Sagar Rao


In [KAFKA-14957|https://issues.apache.org/jira/browse/KAFKA-14957], the 
constructor of ConfigDef.ConfigKey was updated to add a new argument called 
{*}alternativeString{*}. As part of the PR, new *define* methods were also 
added which makes sense. However, since the constructor of 
*ConfigDef.ConfigKey* itself can be used directly by other clients which import 
the dependency, this can break all clients who were using the older constructor 
w/o the *alternativeString* argument. 

I bumped into this when I was testing 
the[kafka-connect-redis|[https://github.com/jcustenborder/kafka-connect-redis/tree/master]]
 connector. It starts up correctly against the official 3.7 release, but fails 
with the following error when run against a 3.8 snapshot

 

 
{code:java}
Caused by: java.lang.NoSuchMethodError: 
org.apache.kafka.common.config.ConfigDef$ConfigKey.(Ljava/lang/String;Lorg/apache/kafka/common/config/ConfigDef$Type;Ljava/lang/Object;Lorg/apache/kafka/common/config/ConfigDef$Validator;Lorg/apache/kafka/common/config/ConfigDef$Importance;Ljava/lang/String;Ljava/lang/String;ILorg/apache/kafka/common/config/ConfigDef$Width;Ljava/lang/String;Ljava/util/List;Lorg/apache/kafka/common/config/ConfigDef$Recommender;Z)V
 at 
com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder.build(ConfigKeyBuilder.java:62)
 at 
com.github.jcustenborder.kafka.connect.redis.RedisConnectorConfig.config(RedisConnectorConfig.java:133)
 at 
com.github.jcustenborder.kafka.connect.redis.RedisSinkConnectorConfig.config(RedisSinkConnectorConfig.java:46)
 at 
com.github.jcustenborder.kafka.connect.redis.RedisSinkConnector.config(RedisSinkConnector.java:73)
 at 
org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:538)
 at 
org.apache.kafka.connect.runtime.AbstractHerder.lambda$validateConnectorConfig$3(AbstractHerder.java:412)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 ... 1 more
 
{code}
 

The reason for that is that the connector uses another library called 
connect-utils which invokes the old constructor 
[directly|https://github.com/jcustenborder/connect-utils/blob/master/connect-utils/src/main/java/com/github/jcustenborder/kafka/connect/utils/config/ConfigKeyBuilder.java#L62]

It is not expected for connector invocations to fail across versions so this 
would cause confusion.

We could argue that why is the constructor being invoked directly instead of 
using the *define* method, but there might be other clients doing the same. We 
should add the old constructor back which calls the new one by setting the 
*alternativeString* to null.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16197) Connect Worker poll timeout prints Consumer poll timeout specific warnings.

2024-01-26 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-16197:
-

 Summary: Connect Worker poll timeout prints Consumer poll timeout 
specific warnings.
 Key: KAFKA-16197
 URL: https://issues.apache.org/jira/browse/KAFKA-16197
 Project: Kafka
  Issue Type: Bug
Reporter: Sagar Rao
Assignee: Sagar Rao


When a worker's poll timeout expires in Connect, the log lines that we see are:

{noformat}
consumer poll timeout has expired. This means the time between subsequent calls 
to poll() was longer than the configured max.poll.interval.ms, which typically 
implies that the poll loop is spending too much time processing messages. You 
can address this either by increasing max.poll.interval.ms or by reducing the 
maximum size of batches returned in poll() with max.poll.records.

{noformat}

and the reason for leaving the group is 


{noformat}
Member XX sending LeaveGroup request to coordinator XX due to consumer poll 
timeout has expired.
{noformat}

which is specific to Consumers and not to Connect workers. The log line above 
in specially misleading because the config `max.poll.interval.ms` is not 
configurable for a Connect worker and could make someone believe that the logs 
are being written for Sink Connectors and not for Connect worker. Ideally, we 
should print something specific to Connect.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16056) Worker poll timeout expiry can lead to Duplicate task assignments.

2023-12-27 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-16056:
-

 Summary: Worker poll timeout expiry can lead to Duplicate task 
assignments.
 Key: KAFKA-16056
 URL: https://issues.apache.org/jira/browse/KAFKA-16056
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Sagar Rao
Assignee: Sagar Rao


When a poll timeout expiry happens for a worker, it triggers a rebalance 
because it leaves the group pro-actively. Under normal scenarios, this leaving 
the group would trigger a scheduled rebalance delay. However, one thing to note 
is that, the worker which left the group temporarily, doesn't give up it's 
assignments and whatever tasks were running on it would remain as is. When the 
scheduled rebalance delay elapses, it would just get back it's assignments but 
given that there won't be any revocations, it should all work out fine.

But there is an edge case here. Let's assume that a scheduled rebalance delay 
was already active on a group and just before a follow up rebalance due to 
scheduled rebalance elapsing, one of the worker's poll timeout expires. At this 
point, a rebalance is imminent and the leader would track the assignments of 
the transiently departed worker as lost 
[here|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L255]
 . When 
[handleLostAssignments|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L441]
 gets triggered, because the scheduledRebalance delay isn't reset yet and if 
[this|https://github.com/apache/kafka/blob/d582d5aff517879b150bc2739bad99df07e15e2b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java#L473]
 condition passes, the leader would assume that it needs to reassign all the 
lost assignments which it will.

But because, the worker for which the poll timeout expired, doesn't rescind 
it's assignments we would end up noticing duplicate assignments- one set on the 
original worker which was already running the tasks and connectors and another 
set on the remaining group of workers which got the redistributed work. This 
could lead to task failures if connector has been written in a way which 
expects no duplicate tasks running across a cluster.

Also, this edge case can be encountered more frequently if the 
`rebalance.timeout.ms` config is set to a lower value. 

One of the approaches could be to do something similar to 
https://issues.apache.org/jira/browse/KAFKA-9184 where upon coordinator 
discovery failure, the worker gives up all it's assignments and joins with an 
empty assignment. We could do something similar in this case as well.




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15296) Allow committing offsets for Dropped records via SMTs

2023-08-02 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-15296:
-

 Summary: Allow committing offsets for Dropped records via SMTs
 Key: KAFKA-15296
 URL: https://issues.apache.org/jira/browse/KAFKA-15296
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Sagar Rao
Assignee: Sagar Rao


Currently the connect Runtime doesn't commit the offsets of records which have 
been dropped due to SMT. This can lead to issues if the dropped record's 
partition reflects a source partition and the connector depends upon the 
committed offsets to make progress. In such cases, the connector might just 
stall. We should enable committing offsets for dropped records as well. Note 
that today if a record is dropped because exactly-once support is enabled and 
the connector chose to abort the batch containing the record, then its offset 
is still committed. So there already exists a discrepancy in the way the 
runtime treats these dropped records.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-15005) Status of KafkaConnect task not correct

2023-07-24 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-15005.
---
Resolution: Duplicate

> Status of KafkaConnect task not correct
> ---
>
> Key: KAFKA-15005
> URL: https://issues.apache.org/jira/browse/KAFKA-15005
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.1, 3.0.0, 3.3.2
>Reporter: Yu Wang
>Priority: Major
>
> Our MM2 is running version 2.5.1.
> After a rebalance of our MM2 source tasks, we found there were several tasks 
> always in *UNASSIGNED* status, even the real tasks already started. 
> So we dump the payload of the status topic of Kafka Connect, and found the 
> last two status change is status *RUNNING* followed by status 
> {*}UNASSIGNED{*}.
> {code:java}
> LogAppendTime: 1684167885961 keysize: 64 valuesize: 95 sequence: -1 
> headerKeys: [] key: task-7 payload: 
> {"state":"RUNNING","trace":null,"worker_id":"worker-1","generation":437643}
> LogAppendTime: 1684167903456 keysize: 64 valuesize: 98 sequence: -1 
> headerKeys: [] key: task-7 payload: 
> {"state":"UNASSIGNED","trace":null,"worker_id":"worker-2","generation":437643}
>  {code}
> But usually, the RUNNING status should be appended after the UNASSIGNED, 
> because the worker coordinator will revoked the tasks before start new tasks.
> Then we checked the log of our MM2 worker. And found that, during that time, 
> there was a task that revoked on worker-2 and started on worker-1.
>  
> Worker-1
> {code:java}
> [2023-05-15 09:24:45,950] INFO [Worker clientId=connect-1, 
> groupId=__group] Starting task task-7 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> [2023-05-15 09:24:45,951] INFO Creating task task-7 
> (org.apache.kafka.connect.runtime.Worker) {code}
> Worker-2
> {code:java}
> [2023-05-15 09:24:40,922] INFO Stopping task task-7 
> (org.apache.kafka.connect.runtime.Worker) {code}
>  
> So I think the incorrect status was caused by the revoked task finished later 
> than the new started task, which made the UNASSIGNED status append to that 
> status topic after the RUNNING status. 
>  
> After reading the code of DistributeHerder, I found that the task revoking is 
> running in a thread pool, the revoke operation just return after submit all 
> the callables. So I think even in the same worker, there is not a guarantee 
> that the revoke operation will always finish before the new tasks start.
> {code:java}
> for (final ConnectorTaskId taskId : tasks) {
> callables.add(getTaskStoppingCallable(taskId));
> }
> // The actual timeout for graceful task/connector stop is applied in worker's
> // stopAndAwaitTask/stopAndAwaitConnector methods.
> startAndStop(callables);
> log.info("Finished stopping tasks in preparation for rebalance"); {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-12283) Flaky Test RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining

2023-07-23 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12283?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-12283.
---
Resolution: Fixed

> Flaky Test 
> RebalanceSourceConnectorsIntegrationTest#testMultipleWorkersRejoining
> 
>
> Key: KAFKA-12283
> URL: https://issues.apache.org/jira/browse/KAFKA-12283
> Project: Kafka
>  Issue Type: Test
>  Components: KafkaConnect, unit tests
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
>
> https://github.com/apache/kafka/pull/1/checks?check_run_id=1820092809
> {quote} {{java.lang.AssertionError: Tasks are imbalanced: 
> localhost:36037=[seq-source13-0, seq-source13-1, seq-source13-2, 
> seq-source13-3, seq-source12-0, seq-source12-1, seq-source12-2, 
> seq-source12-3]
> localhost:43563=[seq-source11-0, seq-source11-2, seq-source10-0, 
> seq-source10-2]
> localhost:46539=[seq-source11-1, seq-source11-3, seq-source10-1, 
> seq-source10-3]
>   at org.junit.Assert.fail(Assert.java:89)
>   at org.junit.Assert.assertTrue(Assert.java:42)
>   at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.assertConnectorAndTasksAreUniqueAndBalanced(RebalanceSourceConnectorsIntegrationTest.java:362)
>   at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:303)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:351)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:319)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:300)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:290)
>   at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testMultipleWorkersRejoining(RebalanceSourceConnectorsIntegrationTest.java:313)}}
> {quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-8391) Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector

2023-07-23 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8391?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-8391.
--
Resolution: Fixed

Fixed with https://github.com/apache/kafka/pull/12561

> Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector
> ---
>
> Key: KAFKA-8391
> URL: https://issues.apache.org/jira/browse/KAFKA-8391
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Sagar Rao
>Priority: Critical
>  Labels: flaky-test
> Attachments: 100-gradle-builds.tar
>
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/4747/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testDeleteConnector/]
> {quote}java.lang.AssertionError: Condition not met within timeout 3. 
> Connector tasks did not stop in time. at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:375) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:352) at 
> org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector(RebalanceSourceConnectorsIntegrationTest.java:166){quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15229) Increase default value of task.shutdown.graceful.timeout.ms

2023-07-21 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-15229:
-

 Summary: Increase default value of 
task.shutdown.graceful.timeout.ms
 Key: KAFKA-15229
 URL: https://issues.apache.org/jira/browse/KAFKA-15229
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Sagar Rao
Assignee: Sagar Rao


The Kafka Connect config [task.shutdown.graceful.timeout.ms. 
|https://kafka.apache.org/documentation/#connectconfigs_task.shutdown.graceful.timeout.ms]has
 a default value of 5s. As per it's definition:

 
{noformat}
Amount of time to wait for tasks to shutdown gracefully. This is the total 
amount of time, not per task. All task have shutdown triggered, then they are 
waited on sequentially.{noformat}

it is the total timeout for all tasks to shutdown. Also, if multiple tasks are 
to be shutdown then, they are waited upon sequentially. Now the default value 
of this config is ok for smaller clusters with less number of tasks, on a 
larger cluster because the timeout can elapse we will see a lot of messages of 
the form 

```
Graceful stop of task  failed.
```

In case of failure in graceful stop of tasks, the tasks are cancelled which 
means that they won't send out a status update. Once that happens there won't 
be any `UNASSIGNED` status message posted for that task. Let's say the task 
stop was triggered by a worker going down. If the cluster is configured to use 
Incremental Cooperative Assignor, then the task wouldn't be reassigned until 
scheduled.rebalance.delay.max.ms interval elapses. So, for that amount of 
duration, the task would show up with status RUNNING whenever it's status is 
queried for. This can be confusing for the users.

This problem can be exacerbated on cloud environments(like kubernetes pods) 
because there is a high chance that the running status would be associated with 
an older worker_id which doesn't even exist in the cluster anymore. 

While the net effect of all of this is not catastrophic i.e it won't lead to 
any processing delays  or loss of data but the status of the task would be off. 
And if there are fast rebalances happening under Incremental Cooperative 
Assignor, then that duration could be high as well. 

So, the proposal is to increase the default value to a higher value. I am 
thinking we can set it to 60s because as far as I can see, it doesn't interfere 
with any other timeout that we have. 

I am tagging this as need-kip because I believe we will need one.






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-12525) Inaccurate task status due to status record interleaving in fast rebalances in Connect

2023-07-20 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-12525.
---
Resolution: Fixed

> Inaccurate task status due to status record interleaving in fast rebalances 
> in Connect
> --
>
> Key: KAFKA-12525
> URL: https://issues.apache.org/jira/browse/KAFKA-12525
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.1, 2.4.1, 2.5.1, 2.7.0, 2.6.1
>Reporter: Konstantine Karantasis
>Assignee: Sagar Rao
>Priority: Major
>
> When a task is stopped in Connect it produces an {{UNASSIGNED}} status 
> record. 
> Equivalently, when a task is started or restarted in Connect it produces an 
> {{RUNNING}} status record in the Connect status topic.
> At the same time rebalances are decoupled from task start and stop. These 
> operations happen in separate executor outside of the main worker thread that 
> performs the rebalance.
> Normally, any delayed and stale {{UNASSIGNED}} status records are fenced by 
> the worker that is sending them. This worker is using the 
> {{StatusBackingStore#putSafe}} method that will reject any stale status 
> messages (called only for {{UNASSIGNED}} or {{FAILED}}) as long as the worker 
> is aware of the newer status record that declares a task as {{RUNNING}}.
> In cases of fast consecutive rebalances where a task is revoked from one 
> worker and assigned to another one, it has been observed that there is a 
> small time window and thus a race condition during which a {{RUNNING}} status 
> record in the new generation is produced and is immediately followed by a 
> delayed {{UNASSIGNED}} status record belonging to the same or a previous 
> generation before the worker that sends this message reads the {{RUNNING}} 
> status record that corresponds to the latest generation.
> A couple of options are available to remediate this race condition. 
> For example a worker that is has started a task can re-write the {{RUNNING}} 
> status message in the topic if it reads a stale {{UNASSIGNED}} message from a 
> previous generation (that should have been fenced). 
> Another option is to ignore stale {{UNASSIGNED}} message (messages from an 
> earlier generation than the one in which the task had {{RUNNING}} status).
> Worth noting that when this race condition takes place, besides the 
> inaccurate status representation, the actual execution of the tasks remains 
> unaffected (e.g. the tasks are running correctly even though they appear as 
> {{UNASSIGNED}}). 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15127) Allow offsets to be reset at the same time a connector is deleted.

2023-06-27 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-15127:
-

 Summary: Allow offsets to be reset at the same time a connector is 
deleted.
 Key: KAFKA-15127
 URL: https://issues.apache.org/jira/browse/KAFKA-15127
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Sagar Rao


This has been listed as [Future 
Work|https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Automaticallydeleteoffsetswithconnectors]
 in KIP-875. Now that the delete offsets mechanism is also in place, we can 
take this up which will allow connector names to be reused after connector 
deletion. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14913) Migrate DistributedHerder Executor shutdown to use ThreadUtils#shutdownExecutorServiceQuietly

2023-06-18 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-14913.
---
Resolution: Fixed

> Migrate DistributedHerder Executor shutdown to use 
> ThreadUtils#shutdownExecutorServiceQuietly
> -
>
> Key: KAFKA-14913
> URL: https://issues.apache.org/jira/browse/KAFKA-14913
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Minor
>
> Some context here: 
> https://github.com/apache/kafka/pull/13557#issuecomment-1509738740



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15041) Source Connector auto topic creation fails when topic is deleted and brokers don't support auto topic creation

2023-05-31 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-15041:
-

 Summary: Source Connector auto topic creation fails when topic is 
deleted and brokers don't support auto topic creation
 Key: KAFKA-15041
 URL: https://issues.apache.org/jira/browse/KAFKA-15041
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Sagar Rao
Assignee: Sagar Rao


[KIP-158|https://cwiki.apache.org/confluence/display/KAFKA/KIP-158%3A+Kafka+Connect+should+allow+source+connectors+to+set+topic-specific+settings+for+new+topics]
  allows the source connectors to create topics even when the broker doesn't 
allow to do so. It does so by checking for every record if a topic needs to be 
created 
[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L500.]
 To not always keep checking for topic presence via admin topics, it also 
maintains a cache of the topics that it has created and doesn't create those 
anymore. This helps to create topics when brokers don't support automatic topic 
creation.

However, lets say the topic gets created initially and later on gets deleted 
while the connector is still running and the brokers don't support automatic 
topic creation. For such cases, the connector has cached the topic it has 
already created and wouldn't recreate it because the cache never updates and 
since the broker doesn't support topic creation, the logs would just be full of 
messages like 

```

 Error while fetching metadata with correlation id 3260 : 
\{connect-test=UNKNOWN_TOPIC_OR_PARTITION} 

```

 

This can become a problem on enviroments where brokers don't allow topic 
creation. We need a way to refresh the topics cache for such cases.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14997) JmxToolTest failing with initializationError

2023-05-13 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-14997:
-

 Summary: JmxToolTest failing with initializationError
 Key: KAFKA-14997
 URL: https://issues.apache.org/jira/browse/KAFKA-14997
 Project: Kafka
  Issue Type: Bug
Reporter: Sagar Rao
Assignee: Federico Valeri


Noticed that JmxToolTest fails with 

```
h4. Error
java.io.IOException: Cannot bind to URL [rmi://:44743/jmxrmi]: 
javax.naming.ServiceUnavailableException [Root exception is 
java.rmi.ConnectException: Connection refused to host: 40.117.157.99; nested 
exception is: 
 java.net.ConnectException: Connection timed out (Connection timed out)]
h4. Stacktrace
java.io.IOException: Cannot bind to URL [rmi://:44743/jmxrmi]: 
javax.naming.ServiceUnavailableException [Root exception is 
java.rmi.ConnectException: Connection refused to host: 40.117.157.99; nested 
exception is: 
 java.net.ConnectException: Connection timed out (Connection timed out)]
 at 
javax.management.remote.rmi.RMIConnectorServer.newIOException(RMIConnectorServer.java:827)
 at 
javax.management.remote.rmi.RMIConnectorServer.start(RMIConnectorServer.java:432)
 at org.apache.kafka.tools.JmxToolTest.startJmxAgent(JmxToolTest.java:337)
 at org.apache.kafka.tools.JmxToolTest.beforeAll(JmxToolTest.java:55)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
 at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
 at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
 at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptLifecycleMethod(TimeoutExtension.java:128)
 at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptBeforeAllMethod(TimeoutExtension.java:70)
 at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
 at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
 at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
 at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
 at 
org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeBeforeAllMethods$13(ClassBasedTestDescriptor.java:411)
 at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at 
org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeBeforeAllMethods(ClassBasedTestDescriptor.java:409)
 at 
org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:215)
 at 
org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:84)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:148)
 at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
 at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
 at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
 at java.util.ArrayList.forEach(ArrayList.java:1259)
 at 
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
 at 

[jira] [Created] (KAFKA-14971) Flaky Test org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs

2023-05-07 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-14971:
-

 Summary: Flaky Test 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs
 Key: KAFKA-14971
 URL: https://issues.apache.org/jira/browse/KAFKA-14971
 Project: Kafka
  Issue Type: Bug
Reporter: Sagar Rao


The test testSyncTopicConfigs in ` 
org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest#testSyncTopicConfigs`
 seems to be flaky. Found here : 
[https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-13594/6/tests]

 

Ran on local against the [same PR  
|https://github.com/apache/kafka/pull/13594]and  it has passed.

 

```
h4. Error
org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, 
because it's explicitly defined on the target topic! ==> expected: <2000> but 
was: <8640>
h4. Stacktrace
org.opentest4j.AssertionFailedError: `delete.retention.ms` should be 2000, 
because it's explicitly defined on the target topic! ==> expected: <2000> but 
was: <8640>
 at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
 at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
 at app//org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
 at app//org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
 at app//org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1153)
 at 
app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.lambda$testSyncTopicConfigs$8(MirrorConnectorsIntegrationBaseTest.java:758)
 at 
app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:325)
 at 
app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:373)
 at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:322)
 at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:306)
 at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:296)
 at 
app//org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest.testSyncTopicConfigs(MirrorConnectorsIntegrationBaseTest.java:752)
 at 
java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
 at 
java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
 at 
java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568)
 at 
app//org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
 at 
app//org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
 at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
 at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
 at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
 at 
app//org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
 at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
 at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
 at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
 at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
 at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
 at 
app//org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
 at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
 at 
app//org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
 at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217)
 at 
app//org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213)
 at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138)
 at 
app//org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
 at 

[jira] [Created] (KAFKA-14956) Flaky test org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted

2023-05-02 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-14956:
-

 Summary: Flaky test 
org.apache.kafka.connect.integration.OffsetsApiIntegrationTest#testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted
 Key: KAFKA-14956
 URL: https://issues.apache.org/jira/browse/KAFKA-14956
 Project: Kafka
  Issue Type: Bug
Reporter: Sagar Rao


```
h4. Error
org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
Sink connector consumer group offsets should catch up to the topic end offsets 
==> expected:  but was: 
h4. Stacktrace
org.opentest4j.AssertionFailedError: Condition not met within timeout 15000. 
Sink connector consumer group offsets should catch up to the topic end offsets 
==> expected:  but was: 
 at 
app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
 at 
app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
 at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
 at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
 at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:211)
 at 
app//org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:337)
 at 
app//org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
 at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:334)
 at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:318)
 at app//org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:291)
 at 
app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:150)
 at 
app//org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsetsDifferentKafkaClusterTargeted(OffsetsApiIntegrationTest.java:131)
 at 
java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
 at 
java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
 at 
java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568)
 at 
app//org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
 at 
app//org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
app//org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
 at 
app//org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at 
app//org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
 at 
app//org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
 at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
 at 
app//org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
 at app//org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
 at 
app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
 at 
app//org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
 at app//org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
 at app//org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
 at app//org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
 at app//org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
 at app//org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
 at app//org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
 at app//org.junit.runners.ParentRunner.run(ParentRunner.java:413)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
 at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
 at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
 at 
java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
Method)
 at 
java.base@17.0.7/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
 at 
java.base@17.0.7/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.base@17.0.7/java.lang.reflect.Method.invoke(Method.java:568)
 at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
 at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
 at 

[jira] [Created] (KAFKA-14938) Flaky test org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest#testConnectorBoundary

2023-04-26 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-14938:
-

 Summary: Flaky test 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest#testConnectorBoundary
 Key: KAFKA-14938
 URL: https://issues.apache.org/jira/browse/KAFKA-14938
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Sagar Rao


Test seems to be failing with 

```
ava.lang.AssertionError: Not enough records produced by source connector. 
Expected at least: 100 + but got 72
h4. Stacktrace
java.lang.AssertionError: Not enough records produced by source connector. 
Expected at least: 100 + but got 72
 at org.junit.Assert.fail(Assert.java:89)
 at org.junit.Assert.assertTrue(Assert.java:42)
 at 
org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest.testConnectorBoundary(ExactlyOnceSourceIntegrationTest.java:421)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
 at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
 at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
 at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
 at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
 at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
 at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
 at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
 at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
 at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
 at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
 at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
 at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
 at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
 at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
 at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
 at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:108)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
 at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:40)
 at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:60)
 at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:52)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
 at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
 at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
 at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
 at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
 at 
org.gradle.api.internal.tasks.testing.worker.TestWorker$2.run(TestWorker.java:176)
 at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
 at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
 at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
 at 
org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
 at 
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:113)
 at 
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:65)
 at 
worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14913) Migrate DistributedHerder Executor shutdown to use ThreadUtils#shutdownExecutorServiceQuietly

2023-04-17 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-14913:
-

 Summary: Migrate DistributedHerder Executor shutdown to use 
ThreadUtils#shutdownExecutorServiceQuietly
 Key: KAFKA-14913
 URL: https://issues.apache.org/jira/browse/KAFKA-14913
 Project: Kafka
  Issue Type: Improvement
Reporter: Sagar Rao






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14586) Move StreamsResetter to tools

2023-04-04 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-14586.
---
Resolution: Fixed

> Move StreamsResetter to tools
> -
>
> Key: KAFKA-14586
> URL: https://issues.apache.org/jira/browse/KAFKA-14586
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Sagar Rao
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14851) Move StreamResetterTest to tools module

2023-03-27 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-14851:
-

 Summary: Move StreamResetterTest to tools module
 Key: KAFKA-14851
 URL: https://issues.apache.org/jira/browse/KAFKA-14851
 Project: Kafka
  Issue Type: Sub-task
Reporter: Sagar Rao


This came up as a suggestion here: 
[https://github.com/apache/kafka/pull/13127#issuecomment-1433155607] .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14734) Use CommandDefaultOptions in StreamsResetter

2023-02-20 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-14734:
-

 Summary: Use CommandDefaultOptions in StreamsResetter 
 Key: KAFKA-14734
 URL: https://issues.apache.org/jira/browse/KAFKA-14734
 Project: Kafka
  Issue Type: Sub-task
Reporter: Sagar Rao


This came up as a suggestion here: 
[https://github.com/apache/kafka/pull/13127#issuecomment-1433155607] .



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14647) Move TopicFilter shared class

2023-01-23 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-14647:
-

 Summary: Move TopicFilter shared class
 Key: KAFKA-14647
 URL: https://issues.apache.org/jira/browse/KAFKA-14647
 Project: Kafka
  Issue Type: Sub-task
Reporter: Sagar Rao
Assignee: Federico Valeri






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-10652) Raft leader should flush accumulated writes after a min size is reached

2023-01-21 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-10652.
---
Resolution: Won't Fix

Not sure this is needed anymore.

> Raft leader should flush accumulated writes after a min size is reached
> ---
>
> Key: KAFKA-10652
> URL: https://issues.apache.org/jira/browse/KAFKA-10652
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: Sagar Rao
>Priority: Major
>
> In KAFKA-10601, we implemented linger semantics similar to the producer to 
> let the leader accumulate a batch of writes before fsyncing them to disk. 
> Currently the fsync is only based on the linger time, but it would be helpful 
> to make it size-based as well. In other words, if we accumulate a 
> configurable N bytes, then we should not wait for linger expiration and 
> should just fsync immediately.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13296) Verify old assignment within StreamsPartitionAssignor

2023-01-21 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-13296.
---
Resolution: Fixed

> Verify old assignment within StreamsPartitionAssignor
> -
>
> Key: KAFKA-13296
> URL: https://issues.apache.org/jira/browse/KAFKA-13296
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Sagar Rao
>Priority: Major
>
> `StreamsPartitionAssignor` is responsible to assign partitions and tasks to 
> all StreamsThreads within an application.
> While it ensures to not assign a single partition/task to two threads, there 
> is limited verification about it. In particular, we had one incident for with 
> a zombie thread/consumer did not cleanup its own internal state correctly due 
> to KAFKA-12983. This unclean zombie-state implied that the _old assignment_ 
> reported to `StreamsPartitionAssignor` contained a single partition for two 
> consumers. As a result, both threads/consumers later revoked the same 
> partition and the zombie-thread could commit it's unclean work (even if it 
> should have been fenced), leading to duplicate output under EOS_v2.
> We should consider to add a check to `StreamsPartitionAssignor` if the _old 
> assignment_ is valid, ie, no partition should be missing and no partition 
> should be assigned to two consumers. For this case, we should log the invalid 
> _old assignment_ and send an error code back to all consumer that indicates 
> that they should shut down "unclean" (ie, without and flushing and no 
> committing any offsets or transactions).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14461) StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores logic to check for active partitions seems brittle.

2022-12-11 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-14461:
-

 Summary: 
StoreQueryIntegrationTest#shouldQuerySpecificActivePartitionStores logic to 
check for active partitions seems brittle.
 Key: KAFKA-14461
 URL: https://issues.apache.org/jira/browse/KAFKA-14461
 Project: Kafka
  Issue Type: Bug
Reporter: Sagar Rao
Assignee: Sagar Rao


Newly added test 
KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions
 as part of KIP-837 passes when run individually but fails when is part of IT 
class and hence is marked as Ignored. 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14454) KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions passes when run individua

2022-12-07 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-14454:
-

 Summary: 
KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions
 passes when run individually but not when is run as part of the IT
 Key: KAFKA-14454
 URL: https://issues.apache.org/jira/browse/KAFKA-14454
 Project: Kafka
  Issue Type: Bug
Reporter: Sagar Rao
Assignee: Sagar Rao


Newly added test 
KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest#shouldThrowIllegalArgumentExceptionWhenCustomPartionerReturnsMultiplePartitions
 as part of KIP-837 passes when run individually but fails when is part of IT 
class and hence is marked as Ignored. 

As part of this ticket, we can also look to move to Junit5 annotations for this 
class since it relies on Junit4 ones.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14401) Connector/Tasks reading offsets can get stuck if underneath WorkThread dies

2022-11-18 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-14401:
-

 Summary: Connector/Tasks reading offsets can get stuck if 
underneath WorkThread dies
 Key: KAFKA-14401
 URL: https://issues.apache.org/jira/browse/KAFKA-14401
 Project: Kafka
  Issue Type: Bug
Reporter: Sagar Rao


When a connector or task tries to read the offsets from the offsets topic, it 
issues `OffsetStorageImpl#offsets` method. This method gets a Future from the 
underneath KafkaBackingStore. KafkaBackingStore invokes 
`KafkaBasedLog#readToEnd` method and passes the Callback. This method 
essentially adds the Callback to a Queue of callbacks that are being managed.

Within KafkaBasedLog, there's a WorkThread which keeps polling over the 
callback queue and executes them and it does this in an infinite loop. However, 
there is an enclosing try/catch block around the while loop. If there's an 
exception thrown which is not caught by any of the other catch blocks, the 
control goes to the outermost catch block and the WorkThread is terminated. 
However, the connectors/tasks are not aware of this and they would keep 
submitting callbacks to KafkaBasedLog with nobody processing them. This can be 
seen in the thread dumps as well:

 

```

"task-thread-connector-0" #6334 prio=5 os_prio=0 cpu=19.36ms elapsed=2092.93s 
tid=0x7f8d9c037000 nid=0x5d00 waiting on condition  [0x7f8dc08cd000]
   java.lang.Thread.State: WAITING (parking)
    at jdk.internal.misc.Unsafe.park(java.base@11.0.15/Native Method)
    - parking to wait for  <0x00070345c9a8> (a 
java.util.concurrent.CountDownLatch$Sync)
    at 
java.util.concurrent.locks.LockSupport.park(java.base@11.0.15/LockSupport.java:194)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.15/AbstractQueuedSynchronizer.java:885)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1039)
    at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.15/AbstractQueuedSynchronizer.java:1345)
    at 
java.util.concurrent.CountDownLatch.await(java.base@11.0.15/CountDownLatch.java:232)
    at 
org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:98)
    at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:101)
    at 
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)

```

 

We need a mechanism to restart the WorkThread if it dies. This could be done in 
the outermost catch block for example.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14007) Connect header converters are never closed

2022-08-20 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-14007.
---
Resolution: Fixed

> Connect header converters are never closed
> --
>
> Key: KAFKA-14007
> URL: https://issues.apache.org/jira/browse/KAFKA-14007
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Sagar Rao
>Priority: Major
>
> The [HeaderConverter 
> interface|https://github.com/apache/kafka/blob/1e21201ea24389bdaccb8a462f3a53e356b58a58/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java#L27]
>  extends {{Closeable}}, but {{HeaderConverter::close}} is never actually 
> invoked anywhere. We can and should start invoking it, probably wrapped in 
> [Utils::closeQuietly|https://github.com/apache/kafka/blob/1e21201ea24389bdaccb8a462f3a53e356b58a58/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L999-L1010]
>  so that any invalid logic in that method for custom header converters that 
> has to date gone undetected will not cause new task failures.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-14012) passing a "method" into the `Utils.closeQuietly` method cause NPE

2022-08-20 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-14012.
---
Resolution: Fixed

> passing a "method" into the `Utils.closeQuietly` method cause NPE
> -
>
> Key: KAFKA-14012
> URL: https://issues.apache.org/jira/browse/KAFKA-14012
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 3.2.0
>Reporter: Luke Chen
>Assignee: Sagar Rao
>Priority: Major
>
> Utils.closeQuietly method accepts `AutoCloseable` object, and close it. But 
> there are some places we passed "method" into Utils.closeQuietly, which 
> causes the object doesn't get closed as expected. 
> I found it appeared in:
> - WorkerConnector
> - AbstractWorkerSourceTask
> - KafkaConfigBackingStore
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14040) Improve test coverage for max buffer bytes metrics

2022-07-03 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-14040:
-

 Summary: Improve test coverage for max buffer bytes metrics
 Key: KAFKA-14040
 URL: https://issues.apache.org/jira/browse/KAFKA-14040
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Sagar Rao
Assignee: Sagar Rao


In some EOS applications with relatively long restoration times we've noticed a 
series of ProducerFencedExceptions occurring during/immediately after 
restoration. The broker logs were able to confirm these were due to 
transactions timing out.

In Streams, it turns out we automatically begin a new txn when calling {{send}} 
(if there isn’t already one in flight). A {{send}} occurs often outside a 
commit during active processing (eg writing to the changelog), leaving the txn 
open until the next commit. And if a StreamThread has been actively processing 
when a rebalance results in a new stateful task without revoking any existing 
tasks, the thread won’t actually commit this open txn before it goes back into 
the restoration phase while it builds up state for the new task. So the 
in-flight transaction is left open during restoration, during which the 
StreamThread only consumes from the changelog without committing, leaving it 
vulnerable to timing out when restoration times exceed the configured 
transaction.timeout.ms for the producer client.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-13970) TopicAdmin topic creation should be retried on TimeoutException

2022-06-11 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13970?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-13970.
---
Resolution: Won't Fix

> TopicAdmin topic creation should be retried on TimeoutException
> ---
>
> Key: KAFKA-13970
> URL: https://issues.apache.org/jira/browse/KAFKA-13970
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Daniel Urban
>Assignee: Sagar Rao
>Priority: Major
>
> org.apache.kafka.connect.util.TopicAdmin#createTopicsWithRetry handles the 
> case when there aren't enough brokers in the cluster to create a topic with 
> the expected replication factor. This logic should also handle the case when 
> there are 0 brokers in the cluster, and should retry in that case.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-13624) Add Metric for Store Cache Size

2022-04-15 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13624?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-13624.
---
Resolution: Won't Fix

Taken care as part of KIP-770

> Add Metric for Store Cache Size
> ---
>
> Key: KAFKA-13624
> URL: https://issues.apache.org/jira/browse/KAFKA-13624
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip
>
> KIP-770 introduced a new metric called `{*}input-buffer-bytes-total`{*} to 
> track the total amount of bytes accumulated by a task. While working through 
> it's PR, it was suggested to add a similar metric for 
> *cache-size-bytes-total* to track the cache size in bytes for a task. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore

2022-02-10 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-9168.
--
Resolution: Later

This would be taken up once we have a more complete solution

> Integrate JNI direct buffer support to RocksDBStore
> ---
>
> Key: KAFKA-9168
> URL: https://issues.apache.org/jira/browse/KAFKA-9168
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>  Labels: perfomance
>
> There has been a PR created on rocksdb Java client to support direct 
> ByteBuffers in Java. We can look at integrating it whenever it gets merged. 
> Link to PR: [https://github.com/facebook/rocksdb/pull/2283]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Resolved] (KAFKA-13152) Replace "buffered.records.per.partition" with "input.buffer.max.bytes"

2022-01-29 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-13152.
---
Resolution: Done

> Replace "buffered.records.per.partition" with "input.buffer.max.bytes" 
> ---
>
> Key: KAFKA-13152
> URL: https://issues.apache.org/jira/browse/KAFKA-13152
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sagar Rao
>Priority: Major
>  Labels: needs-kip
>
> The current config "buffered.records.per.partition" controls how many records 
> in maximum to bookkeep, and hence it is exceed we would pause fetching from 
> this partition. However this config has two issues:
> * It's a per-partition config, so the total memory consumed is dependent on 
> the dynamic number of partitions assigned.
> * Record size could vary from case to case.
> And hence it's hard to bound the memory usage for this buffering. We should 
> consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
> which controls how much bytes in total is allowed to be buffered. This is 
> doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13624) Add Metric for Store Cache Size

2022-01-27 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-13624:
-

 Summary: Add Metric for Store Cache Size
 Key: KAFKA-13624
 URL: https://issues.apache.org/jira/browse/KAFKA-13624
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Sagar Rao
Assignee: Sagar Rao


The current config "buffered.records.per.partition" controls how many records 
in maximum to bookkeep, and hence it is exceed we would pause fetching from 
this partition. However this config has two issues:

* It's a per-partition config, so the total memory consumed is dependent on the 
dynamic number of partitions assigned.
* Record size could vary from case to case.

And hence it's hard to bound the memory usage for this buffering. We should 
consider deprecating that config with a global, e.g. "input.buffer.max.bytes" 
which controls how much bytes in total is allowed to be buffered. This is 
doable since we buffer the raw records in .



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-12289) Add Tests for prefixScan in InMemoryKeyValueStore

2021-02-04 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-12289:
-

 Summary: Add Tests for prefixScan in InMemoryKeyValueStore
 Key: KAFKA-12289
 URL: https://issues.apache.org/jira/browse/KAFKA-12289
 Project: Kafka
  Issue Type: Improvement
  Components: streams, unit tests
Reporter: Sagar Rao
Assignee: Rohit Deshpande


While reviewing, kIP-614, it was decided that tests for 
[CachingInMemoryKeyValueStoreTest.java|https://github.com/apache/kafka/pull/9508/files/899b79781d3412658293b918dce16709121accf1#diff-fdfe70d8fa0798642f0ed54785624aa9850d5d86afff2285acdf12f2775c3588]
 need to be streamlined to use mocked underlyingStore.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10789) Streamlining Tests in ChangeLoggingKeyValueBytesStoreTest

2020-12-01 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-10789:
-

 Summary: Streamlining Tests in ChangeLoggingKeyValueBytesStoreTest
 Key: KAFKA-10789
 URL: https://issues.apache.org/jira/browse/KAFKA-10789
 Project: Kafka
  Issue Type: Improvement
Reporter: Sagar Rao


While reviewing, kIP-614, it was decided that tests for 
[CachingInMemoryKeyValueStoreTest.java|https://github.com/apache/kafka/pull/9508/files/899b79781d3412658293b918dce16709121accf1#diff-fdfe70d8fa0798642f0ed54785624aa9850d5d86afff2285acdf12f2775c3588]
 need to be streamlined to use mocked underlyingStore.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10788) Streamlining Tests in CachingInMemoryKeyValueStoreTest

2020-12-01 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-10788:
-

 Summary: Streamlining Tests in CachingInMemoryKeyValueStoreTest
 Key: KAFKA-10788
 URL: https://issues.apache.org/jira/browse/KAFKA-10788
 Project: Kafka
  Issue Type: Improvement
Reporter: Sagar Rao


While reviewing, kIP-614, it was decided that tests for 
[CachingInMemoryKeyValueStoreTest.java|https://github.com/apache/kafka/pull/9508/files/899b79781d3412658293b918dce16709121accf1#diff-fdfe70d8fa0798642f0ed54785624aa9850d5d86afff2285acdf12f2775c3588]
 need to be streamlined to use mocked underlyingStore.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10767) Add Unit Test cases for missing methods in ThreadCacheTest

2020-11-25 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-10767:
-

 Summary: Add Unit Test cases for missing methods in ThreadCacheTest
 Key: KAFKA-10767
 URL: https://issues.apache.org/jira/browse/KAFKA-10767
 Project: Kafka
  Issue Type: Improvement
Reporter: Sagar Rao
Assignee: Sagar Rao


During the code review for KIP-614, it was noticed that RocksDbRangeIterator 
does not have any unit test cases. Here is the github comment for referrence:

[https://github.com/apache/kafka/pull/9508#discussion_r527612942]

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10766) Add Unit Test cases for RocksDbRangeIterator

2020-11-25 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-10766:
-

 Summary: Add Unit Test cases for RocksDbRangeIterator
 Key: KAFKA-10766
 URL: https://issues.apache.org/jira/browse/KAFKA-10766
 Project: Kafka
  Issue Type: Improvement
Reporter: Sagar Rao
Assignee: Sagar Rao


During the code review for KIP-614, it was noticed that RocksDbRangeIterator 
does not have any unit test cases. Here is the github comment for referrence:

[https://github.com/apache/kafka/pull/9508#discussion_r527612942]

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10648) Add Prefix Scan support to State Stores

2020-10-26 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-10648:
-

 Summary: Add Prefix Scan support to State Stores
 Key: KAFKA-10648
 URL: https://issues.apache.org/jira/browse/KAFKA-10648
 Project: Kafka
  Issue Type: Improvement
Reporter: Sagar Rao
Assignee: Sagar Rao


This issue is related to the changes mentioned in:

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-614%3A+Add+Prefix+Scan+support+for+State+Stores]

 

which seeks to add prefix scan support to State stores. Currently, only RocksDB 
and InMemory key value stores are being supported.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore

2019-11-09 Thread Sagar Rao (Jira)
Sagar Rao created KAFKA-9168:


 Summary: Integrate JNI direct buffer support to RocksDBStore
 Key: KAFKA-9168
 URL: https://issues.apache.org/jira/browse/KAFKA-9168
 Project: Kafka
  Issue Type: Task
Reporter: Sagar Rao


There has been a PR created on rocksdb Java client to support direct 
ByteBuffers in Java. We can look at integrating it whenever it gets merged. 

Link to PR: [https://github.com/facebook/rocksdb/pull/2283]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-4371) Sporadic ConnectException shuts down the whole connect process

2016-11-04 Thread Sagar Rao (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao resolved KAFKA-4371.
--
Resolution: Not A Problem

> Sporadic ConnectException shuts down the whole connect process
> --
>
> Key: KAFKA-4371
> URL: https://issues.apache.org/jira/browse/KAFKA-4371
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sagar Rao
>Priority: Critical
>
> I had setup a 2 node distributed kafka-connect process. Everything went well 
> and I could see lot of data flowing into the relevant kafka topics.
> After some time, JDBCUtils.getCurrentTimeOnDB threw a ConnectException with 
> the following stacktrace:
> The last packet successfully received from the server was 792 milliseconds 
> ago.  The last packet sent successfully to the server was 286 milliseconds 
> ago. (io.confluent.connect.jdbc.source.JdbcSourceTask:234)
> [2016-11-02 12:42:06,116] ERROR Failed to get current time from DB using 
> query select CURRENT_TIMESTAMP; on database MySQL 
> (io.confluent.connect.jdbc.util.JdbcUtils:226)
> com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
> failure
> The last packet successfully received from the server was 1,855 milliseconds 
> ago.  The last packet sent successfully to the server was 557 milliseconds 
> ago.
>at sun.reflect.GeneratedConstructorAccessor51.newInstance(Unknown 
> Source)
>at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
>at 
> com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1117)
>at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3829)
>at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2449)
>at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2629)
>at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2719)
>at 
> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2155)
>at 
> com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1379)
>at 
> com.mysql.jdbc.StatementImpl.createResultSetUsingServerFetch(StatementImpl.java:651)
>at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1527)
>at 
> io.confluent.connect.jdbc.util.JdbcUtils.getCurrentTimeOnDB(JdbcUtils.java:220)
>at 
> io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:157)
>at 
> io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:78)
>at 
> io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:57)
>at 
> io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:207)
>at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:155)
>at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
>at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
>at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>at java.lang.Thread.run(Thread.java:745)
> Caused by: java.net.SocketException: Broken pipe (Write failed)
>at java.net.SocketOutputStream.socketWrite0(Native Method)
>at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
>at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
>at 
> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
>at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
>at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3810)
>... 20 more
> This was just a minor glitch to the connection as the ec2 isntances are able 
> to connect to the Mysql Aurora instances without any issues.
> But, after this exception(which is there a number of times), none of the 
> connectors' tasks are executing. Beyond this, all I see in the logs is 
> [2016-11-02 16:17:41,983] ERROR Failed to run query for table 
> TimestampIncrementingTableQuerier{name='eng_match_series', query='null', 
> topicPrefix='ci-eng-', timestampColumn='modified', incrementingColumn='id'}: 
> com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: No 
> operations allowed after statement closed. 
> 

[jira] [Commented] (KAFKA-4371) Sporadic ConnectException shuts down the whole connect process

2016-11-04 Thread Sagar Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15636091#comment-15636091
 ] 

Sagar Rao commented on KAFKA-4371:
--

[~ewencp] By shutting down I meant that the processing of data doesn't happen 
anymore. I should have been a bit more careful in writing that. Since this is a 
connect-jdbc issue, I will close this ticket. I have created a separate issue 
on connect-jdbc:

https://github.com/confluentinc/kafka-connect-jdbc/issues/160

> Sporadic ConnectException shuts down the whole connect process
> --
>
> Key: KAFKA-4371
> URL: https://issues.apache.org/jira/browse/KAFKA-4371
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sagar Rao
>Priority: Critical
>
> I had setup a 2 node distributed kafka-connect process. Everything went well 
> and I could see lot of data flowing into the relevant kafka topics.
> After some time, JDBCUtils.getCurrentTimeOnDB threw a ConnectException with 
> the following stacktrace:
> The last packet successfully received from the server was 792 milliseconds 
> ago.  The last packet sent successfully to the server was 286 milliseconds 
> ago. (io.confluent.connect.jdbc.source.JdbcSourceTask:234)
> [2016-11-02 12:42:06,116] ERROR Failed to get current time from DB using 
> query select CURRENT_TIMESTAMP; on database MySQL 
> (io.confluent.connect.jdbc.util.JdbcUtils:226)
> com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
> failure
> The last packet successfully received from the server was 1,855 milliseconds 
> ago.  The last packet sent successfully to the server was 557 milliseconds 
> ago.
>at sun.reflect.GeneratedConstructorAccessor51.newInstance(Unknown 
> Source)
>at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
>at 
> com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1117)
>at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3829)
>at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2449)
>at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2629)
>at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2719)
>at 
> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2155)
>at 
> com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1379)
>at 
> com.mysql.jdbc.StatementImpl.createResultSetUsingServerFetch(StatementImpl.java:651)
>at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1527)
>at 
> io.confluent.connect.jdbc.util.JdbcUtils.getCurrentTimeOnDB(JdbcUtils.java:220)
>at 
> io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:157)
>at 
> io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:78)
>at 
> io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:57)
>at 
> io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:207)
>at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:155)
>at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
>at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
>at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>at java.lang.Thread.run(Thread.java:745)
> Caused by: java.net.SocketException: Broken pipe (Write failed)
>at java.net.SocketOutputStream.socketWrite0(Native Method)
>at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
>at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
>at 
> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
>at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
>at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3810)
>... 20 more
> This was just a minor glitch to the connection as the ec2 isntances are able 
> to connect to the Mysql Aurora instances without any issues.
> But, after this exception(which is there a number of times), none of the 
> connectors' tasks are executing. Beyond this, all I see in the logs is 
> [2016-11-02 16:17:41,983] ERROR Failed to run query for table 

[jira] [Commented] (KAFKA-4371) Sporadic ConnectException shuts down the whole connect process

2016-11-03 Thread Sagar Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15632717#comment-15632717
 ] 

Sagar Rao commented on KAFKA-4371:
--

Another thing to note is that we get this exception when I try to delete an 
existing connector. After that we get a CommunicationLinkException and then all 
the processes stop. Not sure how is this connected though.

> Sporadic ConnectException shuts down the whole connect process
> --
>
> Key: KAFKA-4371
> URL: https://issues.apache.org/jira/browse/KAFKA-4371
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sagar Rao
>Priority: Critical
>
> I had setup a 2 node distributed kafka-connect process. Everything went well 
> and I could see lot of data flowing into the relevant kafka topics.
> After some time, JDBCUtils.getCurrentTimeOnDB threw a ConnectException with 
> the following stacktrace:
> The last packet successfully received from the server was 792 milliseconds 
> ago.  The last packet sent successfully to the server was 286 milliseconds 
> ago. (io.confluent.connect.jdbc.source.JdbcSourceTask:234)
> [2016-11-02 12:42:06,116] ERROR Failed to get current time from DB using 
> query select CURRENT_TIMESTAMP; on database MySQL 
> (io.confluent.connect.jdbc.util.JdbcUtils:226)
> com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
> failure
> The last packet successfully received from the server was 1,855 milliseconds 
> ago.  The last packet sent successfully to the server was 557 milliseconds 
> ago.
>at sun.reflect.GeneratedConstructorAccessor51.newInstance(Unknown 
> Source)
>at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
>at 
> com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1117)
>at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3829)
>at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2449)
>at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2629)
>at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2719)
>at 
> com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2155)
>at 
> com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1379)
>at 
> com.mysql.jdbc.StatementImpl.createResultSetUsingServerFetch(StatementImpl.java:651)
>at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1527)
>at 
> io.confluent.connect.jdbc.util.JdbcUtils.getCurrentTimeOnDB(JdbcUtils.java:220)
>at 
> io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:157)
>at 
> io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:78)
>at 
> io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:57)
>at 
> io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:207)
>at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:155)
>at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
>at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
>at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>at java.lang.Thread.run(Thread.java:745)
> Caused by: java.net.SocketException: Broken pipe (Write failed)
>at java.net.SocketOutputStream.socketWrite0(Native Method)
>at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
>at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
>at 
> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
>at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
>at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3810)
>... 20 more
> This was just a minor glitch to the connection as the ec2 isntances are able 
> to connect to the Mysql Aurora instances without any issues.
> But, after this exception(which is there a number of times), none of the 
> connectors' tasks are executing. Beyond this, all I see in the logs is 
> [2016-11-02 16:17:41,983] ERROR Failed to run query for table 
> TimestampIncrementingTableQuerier{name='eng_match_series', query='null', 
> topicPrefix='ci-eng-', 

[jira] [Created] (KAFKA-4371) Sporadic ConnectException shuts down the whole connect process

2016-11-03 Thread Sagar Rao (JIRA)
Sagar Rao created KAFKA-4371:


 Summary: Sporadic ConnectException shuts down the whole connect 
process
 Key: KAFKA-4371
 URL: https://issues.apache.org/jira/browse/KAFKA-4371
 Project: Kafka
  Issue Type: Bug
Reporter: Sagar Rao
Priority: Critical


I had setup a 2 node distributed kafka-connect process. Everything went well 
and I could see lot of data flowing into the relevant kafka topics.

After some time, JDBCUtils.getCurrentTimeOnDB threw a ConnectException with the 
following stacktrace:

The last packet successfully received from the server was 792 milliseconds ago. 
 The last packet sent successfully to the server was 286 milliseconds ago. 
(io.confluent.connect.jdbc.source.JdbcSourceTask:234)
[2016-11-02 12:42:06,116] ERROR Failed to get current time from DB using query 
select CURRENT_TIMESTAMP; on database MySQL 
(io.confluent.connect.jdbc.util.JdbcUtils:226)
com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link 
failure

The last packet successfully received from the server was 1,855 milliseconds 
ago.  The last packet sent successfully to the server was 557 milliseconds ago.
   at sun.reflect.GeneratedConstructorAccessor51.newInstance(Unknown Source)
   at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
   at com.mysql.jdbc.Util.handleNewInstance(Util.java:411)
   at 
com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:1117)
   at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3829)
   at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2449)
   at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2629)
   at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2719)
   at 
com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:2155)
   at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1379)
   at 
com.mysql.jdbc.StatementImpl.createResultSetUsingServerFetch(StatementImpl.java:651)
   at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1527)
   at 
io.confluent.connect.jdbc.util.JdbcUtils.getCurrentTimeOnDB(JdbcUtils.java:220)
   at 
io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:157)
   at 
io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:78)
   at 
io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:57)
   at 
io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:207)
   at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:155)
   at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
   at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
   at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketException: Broken pipe (Write failed)
   at java.net.SocketOutputStream.socketWrite0(Native Method)
   at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
   at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
   at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
   at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
   at com.mysql.jdbc.MysqlIO.send(MysqlIO.java:3810)
   ... 20 more

This was just a minor glitch to the connection as the ec2 isntances are able to 
connect to the Mysql Aurora instances without any issues.

But, after this exception(which is there a number of times), none of the 
connectors' tasks are executing. Beyond this, all I see in the logs is 

[2016-11-02 16:17:41,983] ERROR Failed to run query for table 
TimestampIncrementingTableQuerier{name='eng_match_series', query='null', 
topicPrefix='ci-eng-', timestampColumn='modified', incrementingColumn='id'}: 
com.mysql.jdbc.exceptions.jdbc4.MySQLNonTransientConnectionException: No 
operations allowed after statement closed. 
(io.confluent.connect.jdbc.source.JdbcSourceTask:234)

Is this expected behaviour? I restarted the connector using REST apis but that 
didn't help. How do we handle such scenarios? 

Eventually I had to delete the connector and restart.

The kafka version I am using is 0.10.0.1-cp1 as there were some custom changes 
we needed to make at the Connect level.




--
This message was sent by Atlassian JIRA

[jira] [Created] (KAFKA-4342) Kafka-connect- support tinyint values

2016-10-25 Thread Sagar Rao (JIRA)
Sagar Rao created KAFKA-4342:


 Summary: Kafka-connect- support tinyint values
 Key: KAFKA-4342
 URL: https://issues.apache.org/jira/browse/KAFKA-4342
 Project: Kafka
  Issue Type: Bug
Reporter: Sagar Rao


We have been using Kafka-connect-jdbc  actively for one of our projects and one 
of the issues that we have noticed is the way it handles the tinyint values.

Our database is on mysql and mysql allows both signed and unsigned values to be 
stored. So, it can have values going upto 255 but when kafka-connect sees 
values beyond 128, it fails. 

Reason being, in the ConnectSchema class, the INT8 maps to a Byte which is a 
signed value. If we look at the jdbc docs then this is what they say about 
handling tinyint values:

https://docs.oracle.com/javase/6/docs/technotes/guides/jdbc/getstart/mapping.html

8.3.4 TINYINT
The JDBC type TINYINT represents an 8-bit integer value between 0 and 255 that 
may be signed or unsigned.

The corresponding SQL type, TINYINT, is currently supported by only a subset of 
the major databases. Portable code may therefore prefer to use the JDBC 
SMALLINT type, which is widely supported.

The recommended Java mapping for the JDBC TINYINT type is as either a Java byte 
or a Java short. The 8-bit Java byte type represents a signed value from -128 
to 127, so it may not always be appropriate for larger TINYINT values, whereas 
the 16-bit Java short will always be able to hold all TINYINT values.

I had submitted a PR for this last week. But it failed in the jenkins build for 
unrelated test case. So, if someone can take a look at this or suggest 
something then it would be great:

https://github.com/apache/kafka/pull/2044



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)