[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed
[ https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16359245#comment-16359245 ] Kamal Chandraprakash commented on KAFKA-6106: - [~guozhang] I was busy in other tasks. I try to complete it by this weekend. > Postpone normal processing of tasks within a thread until restoration of all > tasks have completed > - > > Key: KAFKA-6106 > URL: https://issues.apache.org/jira/browse/KAFKA-6106 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 0.11.0.1, 1.0.0 >Reporter: Guozhang Wang >Assignee: Kamal Chandraprakash >Priority: Major > Labels: newbie++ > > Let's say a stream thread hosts multiple tasks, A and B. At the very > beginning when A and B are assigned to the thread, the thread state is > {{TASKS_ASSIGNED}}, and the thread start restoring these two tasks during > this state using the restore consumer while using normal consumer for > heartbeating. > If task A's restoration has completed earlier than task B, then the thread > will start processing A immediately even when it is still in the > {{TASKS_ASSIGNED}} phase. But processing task A will slow down restoration of > task B since it is single-thread. So the thread's transition to {{RUNNING}} > when all of its assigned tasks have completed restoring and now can be > processed will be delayed. > Note that the streams instance's state will only transit to {{RUNNING}} when > all of its threads have transit to {{RUNNING}}, so the instance's transition > will also be delayed by this scenario. > We'd better to not start processing ready tasks immediately, but instead > focus on restoration during the {{TASKS_ASSIGNED}} state to shorten the > overall time of the instance's state transition. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6529) Broker leaks memory and file descriptors after sudden client disconnects
[ https://issues.apache.org/jira/browse/KAFKA-6529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Widman updated KAFKA-6529: --- Fix Version/s: (was: 1.0.2) 1.0.1 > Broker leaks memory and file descriptors after sudden client disconnects > > > Key: KAFKA-6529 > URL: https://issues.apache.org/jira/browse/KAFKA-6529 > Project: Kafka > Issue Type: Bug > Components: network >Affects Versions: 1.0.0, 0.11.0.2 >Reporter: Graham Campbell >Priority: Major > Fix For: 1.1.0, 1.0.1, 0.11.0.3 > > > If a producer forcefully disconnects from a broker while it has staged > receives, that connection enters a limbo state where it is no longer > processed by the SocketServer.Processor, leaking the file descriptor for the > socket and the memory used for the staged recieve queue for that connection. > We noticed this during an upgrade from 0.9.0.2 to 0.11.0.2. Immediately after > the rolling restart to upgrade, open file descriptors on the brokers started > climbing uncontrollably. In a few cases brokers reached our configured max > open files limit of 100k and crashed before we rolled back. > We tracked this down to a buildup of muted connections in the > Selector.closingChannels list. If a client disconnects from the broker with > multiple pending produce requests, when the broker attempts to send an ack to > the client it recieves an IOException because the TCP socket has been closed. > This triggers the Selector to close the channel, but because it still has > pending requests, it adds it to Selector.closingChannels to process those > requests. However, because that exception was triggered by trying to send a > response, the SocketServer.Processor has marked the channel as muted and will > no longer process it at all. > *Reproduced by:* > Starting a Kafka broker/cluster > Client produces several messages and then disconnects abruptly (eg. > _./rdkafka_performance -P -x 100 -b broker:9092 -t test_topic_) > Broker then leaks file descriptor previously used for TCP socket and memory > for unprocessed messages > *Proposed solution (which we've implemented internally)* > Whenever an exception is encountered when writing to a socket in > Selector.pollSelectionKeys(...) record that that connection failed a send by > adding the KafkaChannel ID to Selector.failedSends. Then re-raise the > exception to still trigger the socket disconnection logic. Since every > exception raised in this function triggers a disconnect, we also treat any > exception while writing to the socket as a failed send. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText
[ https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16359020#comment-16359020 ] Bill Bejeck commented on KAFKA-2967: I don't know why, but I had the impression that Asciidoc was not an option for us. I've just spent a fair amount of time writing in Asciidoc and given the choice between Asciidoc and RST I'd prefer Asciidoc. Just my 2 cents. > Move Kafka documentation to ReStructuredText > > > Key: KAFKA-2967 > URL: https://issues.apache.org/jira/browse/KAFKA-2967 > Project: Kafka > Issue Type: Bug >Reporter: Gwen Shapira >Assignee: Gwen Shapira >Priority: Major > > Storing documentation as HTML is kind of BS :) > * Formatting is a pain, and making it look good is even worse > * Its just HTML, can't generate PDFs > * Reading and editting is painful > * Validating changes is hard because our formatting relies on all kinds of > Apache Server features. > I suggest: > * Move to RST > * Generate HTML and PDF during build using Sphinx plugin for Gradle. > Lots of Apache projects are doing this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6502) Kafka streams deserialization handler not committing offsets on error records
[ https://issues.apache.org/jira/browse/KAFKA-6502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358884#comment-16358884 ] Guozhang Wang commented on KAFKA-6502: -- Got it, thanks! I think this is a valid point, we will keep it on track for known issues and consider a fix based on priorities. > Kafka streams deserialization handler not committing offsets on error records > - > > Key: KAFKA-6502 > URL: https://issues.apache.org/jira/browse/KAFKA-6502 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Soby Chacko >Priority: Minor > > See this StackOverflow issue: > [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler] > and this comment: > [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899] > I am trying to use the LogAndContinueExceptionHandler on deserialization. It > works fine when an error occurs by successfully logging and continuing. > However, on a continuous stream of errors, it seems like these messages are > not committed and on a restart of the application they reappear again. It is > more problematic if I try to send the messages in error to a DLQ. On a > restart, they are sent again to DLQ. As soon as I have a good record coming > in, it looks like the offset moves further and not seeing the already logged > messages again after a restart. > I reproduced this behavior by running the sample provided here: > [https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java] > I changed the incoming value Serde to > {{Serdes.Integer().getClass().getName()}} to force a deserialization error on > input and reduced the commit interval to just 1 second. Also added the > following to the config. > {{streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, > LogAndContinueExceptionHandler.class);}}. > It looks like when deserialization exceptions occur, this flag is never set > to be true here: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228]. > It only becomes true once processing succeeds. That might be the reason why > commit is not happening even after I manually call processorContext#commit(). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6549) Deadlock while processing Controller Events
[ https://issues.apache.org/jira/browse/KAFKA-6549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358859#comment-16358859 ] Manikumar commented on KAFKA-6549: -- This is not exactly deadlock. ZooKeeperClient requests are getting locked during zk disconnect scenario. Due to this some of the threads are waiting forever blocking shutdown thread. I am still investigating. Will update more details. > Deadlock while processing Controller Events > --- > > Key: KAFKA-6549 > URL: https://issues.apache.org/jira/browse/KAFKA-6549 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Manikumar >Assignee: Manikumar >Priority: Blocker > Fix For: 1.1.0 > > Attachments: td.txt > > > Stack traces from a single node test cluster that was deadlocked while > processing controller Reelect and Expire events. Attached stack-trace. > {quote} > "main-EventThread" #18 daemon prio=5 os_prio=31 tid=0x7f83e4285800 > nid=0x7d03 waiting on condition [0x7278b000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0007bccadf30> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > kafka.controller.KafkaController$Expire.waitUntilProcessed(KafkaController.scala:1505) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:163) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply$mcV$sp(ZooKeeperClient.scala:365) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process(ZooKeeperClient.scala:363) > at > org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:531) > at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:506) > Locked ownable synchronizers: > - <0x000780054860> (a > java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) > > "controller-event-thread" #42 prio=5 os_prio=31 tid=0x7f83e4293800 > nid=0xad03 waiting on condition [0x73fd3000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0007bcc584a0> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:148) > at > kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1439) > at > kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1432) > at > kafka.zk.KafkaZkClient.registerZNodeChangeHandlerAndCheckExistence(KafkaZkClient.scala:1171) > at > kafka.controller.KafkaController$Reelect$.process(KafkaController.scala:1475) > at > kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:69) > at >
[jira] [Commented] (KAFKA-4651) Improve test coverage of Stores
[ https://issues.apache.org/jira/browse/KAFKA-4651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358817#comment-16358817 ] ASF GitHub Bot commented on KAFKA-4651: --- bbejeck opened a new pull request #4555: KAFKA-4651: [WIP] improve test coverage of stores URL: https://github.com/apache/kafka/pull/4555 Working on increasing the coverage of stores in unit tests. Started with `InMemoryKeyValueLoggedStore` ![screen shot 2018-02-09 at 1 15 11 pm](https://user-images.githubusercontent.com/199238/36044796-c7ddf31e-0da1-11e8-87a5-1d6727a5fe4d.png) ![screen shot 2018-02-09 at 1 24 38 pm](https://user-images.githubusercontent.com/199238/36044805-cd1bc23e-0da1-11e8-9b57-8f8bfa0a21ea.png) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Improve test coverage of Stores > --- > > Key: KAFKA-4651 > URL: https://issues.apache.org/jira/browse/KAFKA-4651 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Damian Guy >Priority: Minor > Labels: newbie > > Some factory methods aren't tested -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic
[ https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-4879: -- Fix Version/s: (was: 1.1.0) 2.0.0 > KafkaConsumer.position may hang forever when deleting a topic > - > > Key: KAFKA-4879 > URL: https://issues.apache.org/jira/browse/KAFKA-4879 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 0.10.2.0 >Reporter: Shixiong Zhu >Assignee: Jason Gustafson >Priority: Major > Fix For: 2.0.0 > > > KafkaConsumer.position may hang forever when deleting a topic. The problem is > this line > https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374 > The timeout is "Long.MAX_VALUE", and it will just retry forever for > UnknownTopicOrPartitionException. > Here is a reproducer > {code} > import org.apache.kafka.clients.consumer.KafkaConsumer; > import org.apache.kafka.common.TopicPartition; > import org.apache.kafka.common.serialization.StringDeserializer; > import java.util.Collections; > import java.util.Properties; > import java.util.Set; > public class KafkaReproducer { > public static void main(String[] args) { > // Make sure "delete.topic.enable" is set to true. > // Please create the topic test with "3" partitions manually. > // The issue is gone when there is only one partition. > String topic = "test"; > Properties props = new Properties(); > props.put("bootstrap.servers", "localhost:9092"); > props.put("group.id", "testgroup"); > props.put("value.deserializer", StringDeserializer.class.getName()); > props.put("key.deserializer", StringDeserializer.class.getName()); > props.put("enable.auto.commit", "false"); > KafkaConsumer kc = new KafkaConsumer(props); > kc.subscribe(Collections.singletonList(topic)); > kc.poll(0); > Set partitions = kc.assignment(); > System.out.println("partitions: " + partitions); > kc.pause(partitions); > kc.seekToEnd(partitions); > System.out.println("please delete the topic in 30 seconds"); > try { > // Sleep 30 seconds to give us enough time to delete the topic. > Thread.sleep(3); > } catch (InterruptedException e) { > e.printStackTrace(); > } > System.out.println("sleep end"); > for (TopicPartition p : partitions) { > System.out.println(p + " offset: " + kc.position(p)); > } > System.out.println("cannot reach here"); > kc.close(); > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5214) Re-add KafkaAdminClient#apiVersions
[ https://issues.apache.org/jira/browse/KAFKA-5214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Colin P. McCabe updated KAFKA-5214: --- Priority: Minor (was: Blocker) Fix Version/s: (was: 1.1.0) 1.2.0 Issue Type: Improvement (was: Bug) > Re-add KafkaAdminClient#apiVersions > --- > > Key: KAFKA-5214 > URL: https://issues.apache.org/jira/browse/KAFKA-5214 > Project: Kafka > Issue Type: Improvement >Reporter: Colin P. McCabe >Assignee: Colin P. McCabe >Priority: Minor > Fix For: 1.2.0 > > > We removed KafkaAdminClient#apiVersions just before 0.11.0.0 to give us a bit > more time to iterate on it before it's included in a release. We should add > the relevant methods back. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6549) Deadlock while processing Controller Events
[ https://issues.apache.org/jira/browse/KAFKA-6549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358729#comment-16358729 ] Ted Yu commented on KAFKA-6549: --- kafka-shutdown-hook didn't hold onto any lock. Can you tell me how the deadlock is formed ? bq. waiting on condition [0x7278b000] The above (0x7278b000) only appeared once in the dump. > Deadlock while processing Controller Events > --- > > Key: KAFKA-6549 > URL: https://issues.apache.org/jira/browse/KAFKA-6549 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Manikumar >Assignee: Manikumar >Priority: Blocker > Fix For: 1.1.0 > > Attachments: td.txt > > > Stack traces from a single node test cluster that was deadlocked while > processing controller Reelect and Expire events. Attached stack-trace. > {quote} > "main-EventThread" #18 daemon prio=5 os_prio=31 tid=0x7f83e4285800 > nid=0x7d03 waiting on condition [0x7278b000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0007bccadf30> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > kafka.controller.KafkaController$Expire.waitUntilProcessed(KafkaController.scala:1505) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:163) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply$mcV$sp(ZooKeeperClient.scala:365) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process(ZooKeeperClient.scala:363) > at > org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:531) > at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:506) > Locked ownable synchronizers: > - <0x000780054860> (a > java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) > > "controller-event-thread" #42 prio=5 os_prio=31 tid=0x7f83e4293800 > nid=0xad03 waiting on condition [0x73fd3000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0007bcc584a0> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:148) > at > kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1439) > at > kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1432) > at > kafka.zk.KafkaZkClient.registerZNodeChangeHandlerAndCheckExistence(KafkaZkClient.scala:1171) > at > kafka.controller.KafkaController$Reelect$.process(KafkaController.scala:1475) > at > kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:69) > at > kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69) > at >
[jira] [Resolved] (KAFKA-6407) Sink task metrics are the same for all connectors
[ https://issues.apache.org/jira/browse/KAFKA-6407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Yokota resolved KAFKA-6407. -- Resolution: Duplicate > Sink task metrics are the same for all connectors > - > > Key: KAFKA-6407 > URL: https://issues.apache.org/jira/browse/KAFKA-6407 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Alexander Koval >Priority: Minor > > I have a lot of sink connectors inside a distributed worker. When I tried to > save metrics to graphite I discovered all task metrics are identical. > {code} > $>get -b > kafka.connect:type=sink-task-metrics,connector=prom-by-catalog-company,task=0 > sink-record-read-total > #mbean = > kafka.connect:type=sink-task-metrics,connector=prom-by-catalog-company,task=0: > sink-record-read-total = 228744.0; > $>get -b > kafka.connect:type=sink-task-metrics,connector=prom-kz-catalog-product,task=0 > sink-record-read-total > #mbean = > kafka.connect:type=sink-task-metrics,connector=prom-kz-catalog-product,task=0: > sink-record-read-total = 228744.0; > $>get -b > kafka.connect:type=sink-task-metrics,connector=prom-ru-catalog-company,task=0 > sink-record-read-total > #mbean = > kafka.connect:type=sink-task-metrics,connector=prom-ru-catalog-company,task=0: > sink-record-read-total = 228744.0; > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-5575) SchemaBuilder should have a method to clone an existing Schema.
[ https://issues.apache.org/jira/browse/KAFKA-5575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-5575: - Labels: needs-kip (was: ) > SchemaBuilder should have a method to clone an existing Schema. > --- > > Key: KAFKA-5575 > URL: https://issues.apache.org/jira/browse/KAFKA-5575 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Jeremy Custenborder >Assignee: Jeremy Custenborder >Priority: Minor > Labels: needs-kip > > Now that Transformations have landed in Kafka Connect we should have an easy > way to do quick modifications to schemas. For example changing the name of a > schema shouldn't be much more than. I should be able to do more stuff like > this. > {code:java} > return SchemaBuilder.from(Schema.STRING_SCHEMA).name("MyNewName").build() > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6407) Sink task metrics are the same for all connectors
[ https://issues.apache.org/jira/browse/KAFKA-6407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358714#comment-16358714 ] Robert Yokota commented on KAFKA-6407: -- Yes, I believe so. I'll resolve it as duplicate. > Sink task metrics are the same for all connectors > - > > Key: KAFKA-6407 > URL: https://issues.apache.org/jira/browse/KAFKA-6407 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Alexander Koval >Priority: Minor > > I have a lot of sink connectors inside a distributed worker. When I tried to > save metrics to graphite I discovered all task metrics are identical. > {code} > $>get -b > kafka.connect:type=sink-task-metrics,connector=prom-by-catalog-company,task=0 > sink-record-read-total > #mbean = > kafka.connect:type=sink-task-metrics,connector=prom-by-catalog-company,task=0: > sink-record-read-total = 228744.0; > $>get -b > kafka.connect:type=sink-task-metrics,connector=prom-kz-catalog-product,task=0 > sink-record-read-total > #mbean = > kafka.connect:type=sink-task-metrics,connector=prom-kz-catalog-product,task=0: > sink-record-read-total = 228744.0; > $>get -b > kafka.connect:type=sink-task-metrics,connector=prom-ru-catalog-company,task=0 > sink-record-read-total > #mbean = > kafka.connect:type=sink-task-metrics,connector=prom-ru-catalog-company,task=0: > sink-record-read-total = 228744.0; > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-5575) SchemaBuilder should have a method to clone an existing Schema.
[ https://issues.apache.org/jira/browse/KAFKA-5575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358653#comment-16358653 ] Randall Hauch edited comment on KAFKA-5575 at 2/9/18 5:13 PM: -- -[~ewencp], is this an example of something that we would not need a KIP for because it is a simple addition, or should we just do a KIP for everything?- Saw that [~ewencp] already wrote up a KIP for this: https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=73635931 was (Author: rhauch): [~ewencp], is this an example of something that we would not need a KIP for because it is a simple addition, or should we just do a KIP for everything? > SchemaBuilder should have a method to clone an existing Schema. > --- > > Key: KAFKA-5575 > URL: https://issues.apache.org/jira/browse/KAFKA-5575 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Jeremy Custenborder >Assignee: Jeremy Custenborder >Priority: Minor > Labels: needs-kip > > Now that Transformations have landed in Kafka Connect we should have an easy > way to do quick modifications to schemas. For example changing the name of a > schema shouldn't be much more than. I should be able to do more stuff like > this. > {code:java} > return SchemaBuilder.from(Schema.STRING_SCHEMA).name("MyNewName").build() > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-5896) Kafka Connect task threads never interrupted
[ https://issues.apache.org/jira/browse/KAFKA-5896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358670#comment-16358670 ] Randall Hauch edited comment on KAFKA-5896 at 2/9/18 4:59 PM: -- [~ewencp], do you have any thoughts on this? I know in the past you've talked about not wanting to do this since some developers won't properly implement the interruption. I agree that not everyone implements it correctly, but we could at least _try_ to cancel the tasks. And this latest PR seems like a good approach. was (Author: rhauch): [~ewencp], do you have any thoughts on this? I know in the past you've talked about not wanting to do this since some developers won't properly implement the interruption. I agree that not everyone implements it correctly, but we could at least _try_ to cancel the tasks. > Kafka Connect task threads never interrupted > > > Key: KAFKA-5896 > URL: https://issues.apache.org/jira/browse/KAFKA-5896 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Nick Pillitteri >Assignee: Nick Pillitteri >Priority: Minor > > h2. Problem > Kafka Connect tasks associated with connectors are run in their own threads. > When tasks are stopped or restarted, a flag is set - {{stopping}} - to > indicate the task should stop processing records. However, if the thread the > task is running in is blocked (waiting for a lock or performing I/O) it's > possible the task will never stop. > I've created a connector specifically to demonstrate this issue (along with > some more detailed instructions for reproducing the issue): > https://github.com/smarter-travel-media/hang-connector > I believe this is an issue because it means that a single badly behaved > connector (any connector that does I/O without timeouts) can cause the Kafka > Connect worker to get into a state where the only solution is to restart the > JVM. > I think, but couldn't reproduce, that this is the cause of this problem on > Stack Overflow: > https://stackoverflow.com/questions/43802156/inconsistent-connector-state-connectexception-task-already-exists-in-this-work > h2. Expected Result > I would expect the Worker to eventually interrupt the thread that the task is > running in. In the past across various other libraries, this is what I've > seen done when a thread needs to be forcibly stopped. > h2. Actual Result > In actuality, the Worker sets a {{stopping}} flag and lets the thread run > indefinitely. It uses a timeout while waiting for the task to stop but after > this timeout has expired it simply sets a {{cancelled}} flag. This means that > every time a task is restarted, a new thread running the task will be > created. Thus a task may end up with multiple instances all running in their > own threads when there's only supposed to be a single thread. > h2. Steps to Reproduce > The problem can be replicated by using the connector available here: > https://github.com/smarter-travel-media/hang-connector > Apologies for how involved the steps are. > I've created a patch that forcibly interrupts threads after they fail to > gracefully shutdown here: > https://github.com/smarter-travel-media/kafka/commit/295c747a9fd82ee8b30556c89c31e0bfcce5a2c5 > I've confirmed that this fixes the issue. I can add some unit tests and > submit a PR if people agree that this is a bug and interrupting threads is > the right fix. > Thanks! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5896) Kafka Connect task threads never interrupted
[ https://issues.apache.org/jira/browse/KAFKA-5896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358670#comment-16358670 ] Randall Hauch commented on KAFKA-5896: -- [~ewencp], do you have any thoughts on this? I know in the past you've talked about not wanting to do this since some developers won't properly implement the interruption. I agree that not everyone implements it correctly, but we could at least _try_ to cancel the tasks. > Kafka Connect task threads never interrupted > > > Key: KAFKA-5896 > URL: https://issues.apache.org/jira/browse/KAFKA-5896 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Nick Pillitteri >Assignee: Nick Pillitteri >Priority: Minor > > h2. Problem > Kafka Connect tasks associated with connectors are run in their own threads. > When tasks are stopped or restarted, a flag is set - {{stopping}} - to > indicate the task should stop processing records. However, if the thread the > task is running in is blocked (waiting for a lock or performing I/O) it's > possible the task will never stop. > I've created a connector specifically to demonstrate this issue (along with > some more detailed instructions for reproducing the issue): > https://github.com/smarter-travel-media/hang-connector > I believe this is an issue because it means that a single badly behaved > connector (any connector that does I/O without timeouts) can cause the Kafka > Connect worker to get into a state where the only solution is to restart the > JVM. > I think, but couldn't reproduce, that this is the cause of this problem on > Stack Overflow: > https://stackoverflow.com/questions/43802156/inconsistent-connector-state-connectexception-task-already-exists-in-this-work > h2. Expected Result > I would expect the Worker to eventually interrupt the thread that the task is > running in. In the past across various other libraries, this is what I've > seen done when a thread needs to be forcibly stopped. > h2. Actual Result > In actuality, the Worker sets a {{stopping}} flag and lets the thread run > indefinitely. It uses a timeout while waiting for the task to stop but after > this timeout has expired it simply sets a {{cancelled}} flag. This means that > every time a task is restarted, a new thread running the task will be > created. Thus a task may end up with multiple instances all running in their > own threads when there's only supposed to be a single thread. > h2. Steps to Reproduce > The problem can be replicated by using the connector available here: > https://github.com/smarter-travel-media/hang-connector > Apologies for how involved the steps are. > I've created a patch that forcibly interrupts threads after they fail to > gracefully shutdown here: > https://github.com/smarter-travel-media/kafka/commit/295c747a9fd82ee8b30556c89c31e0bfcce5a2c5 > I've confirmed that this fixes the issue. I can add some unit tests and > submit a PR if people agree that this is a bug and interrupting threads is > the right fix. > Thanks! -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-5575) SchemaBuilder should have a method to clone an existing Schema.
[ https://issues.apache.org/jira/browse/KAFKA-5575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358653#comment-16358653 ] Randall Hauch commented on KAFKA-5575: -- [~ewencp], is this an example of something that we would not need a KIP for because it is a simple addition, or should we just do a KIP for everything? > SchemaBuilder should have a method to clone an existing Schema. > --- > > Key: KAFKA-5575 > URL: https://issues.apache.org/jira/browse/KAFKA-5575 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Jeremy Custenborder >Assignee: Jeremy Custenborder >Priority: Minor > > Now that Transformations have landed in Kafka Connect we should have an easy > way to do quick modifications to schemas. For example changing the name of a > schema shouldn't be much more than. I should be able to do more stuff like > this. > {code:java} > return SchemaBuilder.from(Schema.STRING_SCHEMA).name("MyNewName").build() > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-4159) Allow overriding producer & consumer properties at the connector level
[ https://issues.apache.org/jira/browse/KAFKA-4159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-4159: - Labels: needs-kip (was: ) > Allow overriding producer & consumer properties at the connector level > -- > > Key: KAFKA-4159 > URL: https://issues.apache.org/jira/browse/KAFKA-4159 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Reporter: Shikhar Bhushan >Assignee: Stephen Durfey >Priority: Major > Labels: needs-kip > > As an example use cases, overriding a sink connector's consumer's partition > assignment strategy. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6502) Kafka streams deserialization handler not committing offsets on error records
[ https://issues.apache.org/jira/browse/KAFKA-6502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358638#comment-16358638 ] Soby Chacko commented on KAFKA-6502: Hi Guozhang, your reasonings are correct. However, if I introduce a custom DLQ type of exception handler (in which case, if I have a bad Message, it is sent to an error-topic), then if I happen to re-start the application, I don't want the poison pills to be re-sent to the DLQ as they have already been sent. Anyways, this is a corner case I ran into and may not happen frequently. Thanks! > Kafka streams deserialization handler not committing offsets on error records > - > > Key: KAFKA-6502 > URL: https://issues.apache.org/jira/browse/KAFKA-6502 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Soby Chacko >Priority: Minor > > See this StackOverflow issue: > [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler] > and this comment: > [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899] > I am trying to use the LogAndContinueExceptionHandler on deserialization. It > works fine when an error occurs by successfully logging and continuing. > However, on a continuous stream of errors, it seems like these messages are > not committed and on a restart of the application they reappear again. It is > more problematic if I try to send the messages in error to a DLQ. On a > restart, they are sent again to DLQ. As soon as I have a good record coming > in, it looks like the offset moves further and not seeing the already logged > messages again after a restart. > I reproduced this behavior by running the sample provided here: > [https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java] > I changed the incoming value Serde to > {{Serdes.Integer().getClass().getName()}} to force a deserialization error on > input and reduced the commit interval to just 1 second. Also added the > following to the config. > {{streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, > LogAndContinueExceptionHandler.class);}}. > It looks like when deserialization exceptions occur, this flag is never set > to be true here: > [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228]. > It only becomes true once processing succeeds. That might be the reason why > commit is not happening even after I manually call processorContext#commit(). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6208) Reduce startup time for Kafka Connect workers
[ https://issues.apache.org/jira/browse/KAFKA-6208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358637#comment-16358637 ] Randall Hauch commented on KAFKA-6208: -- [~Satyajit], it might be possible to start scanning asynchronously, but then whenever code needs a plugin it would have to block. Doable, but that's logged under KAFKA-5451 (now linked). > Reduce startup time for Kafka Connect workers > - > > Key: KAFKA-6208 > URL: https://issues.apache.org/jira/browse/KAFKA-6208 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Randall Hauch >Priority: Major > > Kafka Connect startup times are excessive with a handful of connectors on the > plugin path or classpath. We should not be scanning three times (once for > connectors, once for SMTs, and once for converters), and hopefully we can > avoid scanning directories that are clearly not plugin directories. > We should also consider using Java's Service Loader to quickly identify > connectors. The latter would require a KIP and would require time to for > connectors to migrate, but we could be smarter about only scanning plugin > directories that need to be scanned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6407) Sink task metrics are the same for all connectors
[ https://issues.apache.org/jira/browse/KAFKA-6407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358619#comment-16358619 ] Randall Hauch commented on KAFKA-6407: -- [~rayokota], does this sound like a duplicate of KAFKA-6504? > Sink task metrics are the same for all connectors > - > > Key: KAFKA-6407 > URL: https://issues.apache.org/jira/browse/KAFKA-6407 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Alexander Koval >Priority: Minor > > I have a lot of sink connectors inside a distributed worker. When I tried to > save metrics to graphite I discovered all task metrics are identical. > {code} > $>get -b > kafka.connect:type=sink-task-metrics,connector=prom-by-catalog-company,task=0 > sink-record-read-total > #mbean = > kafka.connect:type=sink-task-metrics,connector=prom-by-catalog-company,task=0: > sink-record-read-total = 228744.0; > $>get -b > kafka.connect:type=sink-task-metrics,connector=prom-kz-catalog-product,task=0 > sink-record-read-total > #mbean = > kafka.connect:type=sink-task-metrics,connector=prom-kz-catalog-product,task=0: > sink-record-read-total = 228744.0; > $>get -b > kafka.connect:type=sink-task-metrics,connector=prom-ru-catalog-company,task=0 > sink-record-read-total > #mbean = > kafka.connect:type=sink-task-metrics,connector=prom-ru-catalog-company,task=0: > sink-record-read-total = 228744.0; > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-4632) Kafka Connect WorkerSinkTask.closePartitions doesn't handle WakeupException
[ https://issues.apache.org/jira/browse/KAFKA-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-4632. -- Resolution: Fixed Fix Version/s: 0.10.0.1 0.10.1.0 I'm going to close this as fixed in 0.10.0.1. [~ScottReynolds], if you disagree, please feel free to reopen with more detail. > Kafka Connect WorkerSinkTask.closePartitions doesn't handle WakeupException > --- > > Key: KAFKA-4632 > URL: https://issues.apache.org/jira/browse/KAFKA-4632 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0 >Reporter: Scott Reynolds >Priority: Major > Fix For: 0.10.1.0, 0.10.0.1 > > > WorkerSinkTask's closePartitions method isn't handling WakeupException that > can be thrown from commitSync. > {code} > org.apache.kafka.common.errors.WakeupException > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup > (ConsumerNetworkClient.java:404) > at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll > (ConsumerNetworkClient.java:245) > at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll > (ConsumerNetworkClient.java:180) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync > (ConsumerCoordinator.java:499) > at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync > (KafkaConsumer.java:1104) > at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitSync > (WorkerSinkTask.java:245) > at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit > (WorkerSinkTask.java:264) > at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets > (WorkerSinkTask.java:305) > at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions > (WorkerSinkTask.java:435) > at org.apache.kafka.connect.runtime.WorkerSinkTask.execute > (WorkerSinkTask.java:147) > at org.apache.kafka.connect.runtime.WorkerTask.doRun (WorkerTask.java:140) > at org.apache.kafka.connect.runtime.WorkerTask.run (WorkerTask.java:175) > at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511) > at java.util.concurrent.FutureTask.run (FutureTask.java:266) > at java.util.concurrent.ThreadPoolExecutor.runWorker > (ThreadPoolExecutor.java:1142) > at java.util.concurrent.ThreadPoolExecutor$Worker.run > (ThreadPoolExecutor.java:617) > at java.lang.Thread.run (Thread.java:745) > {code} > I believe it should catch it and ignore it as that is what the poll method > does when isStopping is true > {code:java} > } catch (WakeupException we) { > log.trace("{} consumer woken up", id); > if (isStopping()) > return; > if (shouldPause()) { > pauseAll(); > } else if (!pausedForRedelivery) { > resumeAll(); > } > } > {code} > But unsure, love some insight into this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6208) Reduce startup time for Kafka Connect workers
[ https://issues.apache.org/jira/browse/KAFKA-6208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358600#comment-16358600 ] Randall Hauch commented on KAFKA-6208: -- [~Satyajit], I would argue that anything with the scanning approach should be fixed as part of KAFKA-6503, for which there is a simple fix. This issue, OTOH, is more about moving to a non-scanning approach, and this requires a longer-term phase in of the new mechanism and phase-out of the existing scanning mechanism. Regarding your specific suggestion, I don't believe that loading the different components asynchronously would be of any benefit over the much simpler suggestion in KAFKA-6503, which already is going to utilize multiple threads. > Reduce startup time for Kafka Connect workers > - > > Key: KAFKA-6208 > URL: https://issues.apache.org/jira/browse/KAFKA-6208 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Randall Hauch >Priority: Major > > Kafka Connect startup times are excessive with a handful of connectors on the > plugin path or classpath. We should not be scanning three times (once for > connectors, once for SMTs, and once for converters), and hopefully we can > avoid scanning directories that are clearly not plugin directories. > We should also consider using Java's Service Loader to quickly identify > connectors. The latter would require a KIP and would require time to for > connectors to migrate, but we could be smarter about only scanning plugin > directories that need to be scanned. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6503) Connect: Plugin scan is very slow
[ https://issues.apache.org/jira/browse/KAFKA-6503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-6503: - Fix Version/s: 1.1.0 > Connect: Plugin scan is very slow > - > > Key: KAFKA-6503 > URL: https://issues.apache.org/jira/browse/KAFKA-6503 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Per Steffensen >Priority: Critical > Fix For: 1.1.0 > > > Just upgraded to 1.0.0. It seems some plugin scan has been introduced. It is > very slow - see logs from starting my Kafka-Connect instance at the bottom. > It takes almost 4 minutes scanning. I am running Kafka-Connect in docker > based on confluentinc/cp-kafka-connect:4.0.0. I set plugin.path to > /usr/share/java. The only thing I have added is a 13MB jar in > /usr/share/java/kafka-connect-file-streamer-client containing two connectors > and a converter. That one alone seems to take 20 secs. > If it was just scanning in the background, and everything was working it > probably would not be a big issue. But it does not. Right after starting the > Kafka-Connect instance I try to create a connector via the /connectors > endpoint, but it will not succeed before the plugin scanning has finished (4 > minutes) > I am not even sure why scanning is necessary. Is it not always true that > connectors, converters etc are mentioned by name, so to see if it exists, > just try to load the class - the classloader will tell if it is available. > Hmmm, there is probably a reason. > Anyway, either it should be made much faster, or at least Kafka-Connect > should be fully functional (or as functional as possible) while scanning is > going on. > {code} > [2018-01-30 13:52:26,834] INFO Scanning for plugin classes. This might take a > moment ... (org.apache.kafka.connect.cli.ConnectDistributed) > [2018-01-30 13:52:27,218] INFO Loading plugin from: > /usr/share/java/kafka-connect-file-streamer-client > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:43,037] INFO Registered loader: > PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-file-streamer-client/} > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:43,038] INFO Added plugin > 'com.tlt.common.files.streamer.client.kafka.connect.OneTaskPerStreamSourceConnectorManager' > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:43,039] INFO Added plugin > 'com.tlt.common.files.streamer.client.kafka.connect.OneTaskPerFilesStreamerServerSourceConnectorManager' > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:43,040] INFO Added plugin > 'com.tlt.common.files.streamer.client.kafka.connect.KafkaConnectByteArrayConverter' > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:43,049] INFO Loading plugin from: > /usr/share/java/kafka-connect-elasticsearch > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:47,595] INFO Registered loader: > PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-elasticsearch/} > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:47,611] INFO Added plugin > 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector' > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:47,651] INFO Loading plugin from: > /usr/share/java/kafka-connect-jdbc > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:49,491] INFO Registered loader: > PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-jdbc/} > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:49,491] INFO Added plugin > 'io.confluent.connect.jdbc.JdbcSinkConnector' > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:49,492] INFO Added plugin > 'io.confluent.connect.jdbc.JdbcSourceConnector' > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:49,663] INFO Loading plugin from: > /usr/share/java/kafka-connect-s3 > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:53:51,055] INFO Registered loader: > PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-s3/} > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:53:51,055] INFO Added plugin > 'io.confluent.connect.s3.S3SinkConnector' > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:53:51,061] INFO Added plugin > 'io.confluent.connect.storage.tools.SchemaSourceConnector' >
[jira] [Updated] (KAFKA-6503) Connect: Plugin scan is very slow
[ https://issues.apache.org/jira/browse/KAFKA-6503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-6503: - Issue Type: Bug (was: Improvement) > Connect: Plugin scan is very slow > - > > Key: KAFKA-6503 > URL: https://issues.apache.org/jira/browse/KAFKA-6503 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 1.0.0 >Reporter: Per Steffensen >Priority: Critical > Fix For: 1.1.0 > > > Just upgraded to 1.0.0. It seems some plugin scan has been introduced. It is > very slow - see logs from starting my Kafka-Connect instance at the bottom. > It takes almost 4 minutes scanning. I am running Kafka-Connect in docker > based on confluentinc/cp-kafka-connect:4.0.0. I set plugin.path to > /usr/share/java. The only thing I have added is a 13MB jar in > /usr/share/java/kafka-connect-file-streamer-client containing two connectors > and a converter. That one alone seems to take 20 secs. > If it was just scanning in the background, and everything was working it > probably would not be a big issue. But it does not. Right after starting the > Kafka-Connect instance I try to create a connector via the /connectors > endpoint, but it will not succeed before the plugin scanning has finished (4 > minutes) > I am not even sure why scanning is necessary. Is it not always true that > connectors, converters etc are mentioned by name, so to see if it exists, > just try to load the class - the classloader will tell if it is available. > Hmmm, there is probably a reason. > Anyway, either it should be made much faster, or at least Kafka-Connect > should be fully functional (or as functional as possible) while scanning is > going on. > {code} > [2018-01-30 13:52:26,834] INFO Scanning for plugin classes. This might take a > moment ... (org.apache.kafka.connect.cli.ConnectDistributed) > [2018-01-30 13:52:27,218] INFO Loading plugin from: > /usr/share/java/kafka-connect-file-streamer-client > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:43,037] INFO Registered loader: > PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-file-streamer-client/} > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:43,038] INFO Added plugin > 'com.tlt.common.files.streamer.client.kafka.connect.OneTaskPerStreamSourceConnectorManager' > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:43,039] INFO Added plugin > 'com.tlt.common.files.streamer.client.kafka.connect.OneTaskPerFilesStreamerServerSourceConnectorManager' > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:43,040] INFO Added plugin > 'com.tlt.common.files.streamer.client.kafka.connect.KafkaConnectByteArrayConverter' > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:43,049] INFO Loading plugin from: > /usr/share/java/kafka-connect-elasticsearch > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:47,595] INFO Registered loader: > PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-elasticsearch/} > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:47,611] INFO Added plugin > 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector' > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:47,651] INFO Loading plugin from: > /usr/share/java/kafka-connect-jdbc > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:49,491] INFO Registered loader: > PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-jdbc/} > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:49,491] INFO Added plugin > 'io.confluent.connect.jdbc.JdbcSinkConnector' > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:49,492] INFO Added plugin > 'io.confluent.connect.jdbc.JdbcSourceConnector' > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:52:49,663] INFO Loading plugin from: > /usr/share/java/kafka-connect-s3 > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:53:51,055] INFO Registered loader: > PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-s3/} > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:53:51,055] INFO Added plugin > 'io.confluent.connect.s3.S3SinkConnector' > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) > [2018-01-30 13:53:51,061] INFO Added plugin > 'io.confluent.connect.storage.tools.SchemaSourceConnector' >
[jira] [Commented] (KAFKA-6547) group offset reset and begin_offset ignored/no effect
[ https://issues.apache.org/jira/browse/KAFKA-6547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358415#comment-16358415 ] Dan commented on KAFKA-6547: I think I've eliminated my client from the problem definition. I start the client first to get the group started, then I stop it. Describe shows: {code:java} kafka@kafka-server-1:~$ KAFKA_OPTS="-Xmx128M -Djava.security.auth.login.config=./config/kafka_client_jaas.conf" kafka-consumer-groups.sh --command-config ./config/consumer.properties --bootstrap-server kafka-server-1:9092 --group meta-printer --describe Note: This will not show information about old Zookeeper-based consumers. Consumer group 'meta-printer' has no active members. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID payload-meta 0 836464 1060485 224021 - - - {code} Then I run my client for about 20s, with an offset commit frequency of 10s. Here is a describe after that: {code:java} kafka@kafka-server-1:~$ KAFKA_OPTS="-Xmx128M -Djava.security.auth.login.config=./config/kafka_client_jaas.conf" kafka-consumer-groups.sh --command-config ./config/consumer.properties --bootstrap-server kafka-server-1:9092 --group meta-printer --describe Note: This will not show information about old Zookeeper-based consumers. Consumer group 'meta-printer' has no active members. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID payload-meta 0 974424 1060485 86061 - - - {code} Now I'm going to try to reset the group to the earliest available: {code:java} kafka@kafka-server-1:~$ KAFKA_OPTS="-Xmx128M -Djava.security.auth.login.config=./config/kafka_client_jaas.conf" kafka-consumer-groups.sh --command-config ./config/consumer.properties --bootstrap-server kafka-server-1:9092 --group meta-printer --reset-offsets --execute --to-earliest --topic payload-meta Note: This will not show information about old Zookeeper-based consumers. TOPIC PARTITION NEW-OFFSET payload-meta 0 0 {code} And then a subsequent describe: {code:java} kafka@kafka-server-1:~$ KAFKA_OPTS="-Xmx128M -Djava.security.auth.login.config=./config/kafka_client_jaas.conf" kafka-consumer-groups.sh --command-config ./config/consumer.properties --bootstrap-server kafka-server-1:9092 --group meta-printer --describe Note: This will not show information about old Zookeeper-based consumers. Consumer group 'meta-printer' has no active members. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID payload-meta 0 974424 1060485 86061 - - - {code} Unchanged. The default server retention is forever, so no log should be pruned. But even if there was pruning, a reset to earliest should be able to at least get the offset back to the first offset shown above. The problem is not present in 0.11. I you want output of that, I'll have to set it up. Am I misunderstanding what should be happening here? Many thanks. > group offset reset and begin_offset ignored/no effect > - > > Key: KAFKA-6547 > URL: https://issues.apache.org/jira/browse/KAFKA-6547 > Project: Kafka > Issue Type: Bug > Components: offset manager >Affects Versions: 1.0.0 > Environment: ubuntu 16, java 1.8 >Reporter: Dan >Priority: Major > Fix For: 0.11.0.2 > > > Use of kafka-consumer-group.sh with --reset-offsets --execute <--to-earliest > or anything> has no effect in 1.0. When my group client connects and requests > a specific offset or an earliest there's no effect and the consumer is unable > to poll, so no messages, even new ones are ignored. > I installed 0.11 and these problems are not manifest. > I'm unfamiliar with the internals and put the offset manager as the possible > component, but that's a guess. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6529) Broker leaks memory and file descriptors after sudden client disconnects
[ https://issues.apache.org/jira/browse/KAFKA-6529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358354#comment-16358354 ] ASF GitHub Bot commented on KAFKA-6529: --- rajinisivaram closed pull request #4517: KAFKA-6529: Stop file descriptor leak when client disconnects with staged receives URL: https://github.com/apache/kafka/pull/4517 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 6bfcfd21a90..ed037b3a8f7 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -325,9 +325,9 @@ public void send(Send send) { } catch (Exception e) { // update the state for consistency, the channel will be discarded after `close` channel.state(ChannelState.FAILED_SEND); -// ensure notification via `disconnected` +// ensure notification via `disconnected` when `failedSends` are processed in the next poll this.failedSends.add(connectionId); -close(channel, false); +close(channel, false, false); if (!(e instanceof CancelledKeyException)) { log.error("Unexpected exception during send, closing connection {} and rethrowing exception {}", connectionId, e); @@ -450,6 +450,7 @@ void pollSelectionKeys(Set selectionKeys, if (idleExpiryManager != null) idleExpiryManager.update(channel.id(), currentTimeNanos); +boolean sendFailed = false; try { /* complete any connections that have finished their handshake (either normally or immediately) */ @@ -491,7 +492,13 @@ void pollSelectionKeys(Set selectionKeys, /* if channel is ready write to any sockets that have space in their buffer and for which we have data */ if (channel.ready() && key.isWritable()) { -Send send = channel.write(); +Send send = null; +try { +send = channel.write(); +} catch (Exception e) { +sendFailed = true; +throw e; +} if (send != null) { this.completedSends.add(send); this.sensors.recordBytesSent(channel.id(), send.size()); @@ -500,7 +507,7 @@ void pollSelectionKeys(Set selectionKeys, /* cancel any defunct sockets */ if (!key.isValid()) -close(channel, true); +close(channel, true, true); } catch (Exception e) { String desc = channel.socketDescription(); @@ -510,7 +517,7 @@ else if (e instanceof AuthenticationException) // will be logged later as error log.debug("Connection with {} disconnected due to authentication exception", desc, e); else log.warn("Unexpected error from {}; closing connection", desc, e); -close(channel, true); +close(channel, !sendFailed, true); } finally { maybeRecordTimePerConnection(channel, channelStartTimeNanos); } @@ -620,7 +627,7 @@ private void maybeCloseOldestConnection(long currentTimeNanos) { log.trace("About to close the idle connection from {} due to being idle for {} millis", connectionId, (currentTimeNanos - expiredConnection.getValue()) / 1000 / 1000); channel.state(ChannelState.EXPIRED); -close(channel, true); +close(channel, true, true); } } } @@ -674,7 +681,7 @@ public void close(String id) { // There is no disconnect notification for local close, but updating // channel state here anyway to avoid confusion. channel.state(ChannelState.LOCAL_CLOSE); -close(channel, false); +close(channel, false, false); } else { KafkaChannel closingChannel = this.closingChannels.remove(id); // Close any closing channel, leave the channel in the state in which closing was triggered @@ -694,7 +701,10 @@ public void close(String id) { * closed immediately. The channel will not be added to disconnected list and it is the * responsibility of the caller to handle disconnect
[jira] [Updated] (KAFKA-6549) Deadlock while processing Controller Events
[ https://issues.apache.org/jira/browse/KAFKA-6549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram updated KAFKA-6549: -- Priority: Blocker (was: Major) Fix Version/s: 1.1.0 Component/s: core > Deadlock while processing Controller Events > --- > > Key: KAFKA-6549 > URL: https://issues.apache.org/jira/browse/KAFKA-6549 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Manikumar >Assignee: Manikumar >Priority: Blocker > Fix For: 1.1.0 > > Attachments: td.txt > > > Stack traces from a single node test cluster that was deadlocked while > processing controller Reelect and Expire events. Attached stack-trace. > {quote} > "main-EventThread" #18 daemon prio=5 os_prio=31 tid=0x7f83e4285800 > nid=0x7d03 waiting on condition [0x7278b000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0007bccadf30> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > kafka.controller.KafkaController$Expire.waitUntilProcessed(KafkaController.scala:1505) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:163) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply$mcV$sp(ZooKeeperClient.scala:365) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process(ZooKeeperClient.scala:363) > at > org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:531) > at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:506) > Locked ownable synchronizers: > - <0x000780054860> (a > java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) > > "controller-event-thread" #42 prio=5 os_prio=31 tid=0x7f83e4293800 > nid=0xad03 waiting on condition [0x73fd3000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0007bcc584a0> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:148) > at > kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1439) > at > kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1432) > at > kafka.zk.KafkaZkClient.registerZNodeChangeHandlerAndCheckExistence(KafkaZkClient.scala:1171) > at > kafka.controller.KafkaController$Reelect$.process(KafkaController.scala:1475) > at > kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:69) > at > kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69) > at > kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69) > at
[jira] [Created] (KAFKA-6551) Unbounded queues in WorkerSourceTask cause OutOfMemoryError
Gunnar Morling created KAFKA-6551: - Summary: Unbounded queues in WorkerSourceTask cause OutOfMemoryError Key: KAFKA-6551 URL: https://issues.apache.org/jira/browse/KAFKA-6551 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Gunnar Morling A Debezium user reported an {{OutOfMemoryError}} to us, with over 50,000 messages in the {{WorkerSourceTask#outstandingMessages}} map. This map is unbounded and I can't see any way of "rate limiting" which would control how many records are added to it. Growth can only indirectly be limited by reducing the offset flush interval, but as connectors can return large amounts of messages in single {{poll()}} calls that's not sufficient in all cases. Note the user reported this issue during snapshotting a database, i.e. a high number of records arrived in a very short period of time. To solve the problem I'd suggest to make this map backpressure-aware and thus prevent its indefinite growth, so that no further records will be polled from the connector until messages have been taken out of the map again. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6550) UpdateMetadataRequest should be lazily created
[ https://issues.apache.org/jira/browse/KAFKA-6550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358293#comment-16358293 ] ASF GitHub Bot commented on KAFKA-6550: --- huxihx opened a new pull request #4553: KAFKA-6550: UpdateMetadataRequest should be lazily created URL: https://github.com/apache/kafka/pull/4553 We should defer to construct UpdateMetadataRequest Builder to the right time when we really need them. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > UpdateMetadataRequest should be lazily created > -- > > Key: KAFKA-6550 > URL: https://issues.apache.org/jira/browse/KAFKA-6550 > Project: Kafka > Issue Type: Improvement > Components: controller >Affects Versions: 1.0.0 >Reporter: huxihx >Assignee: huxihx >Priority: Minor > > In ControllerBrokerRequestBatch.sendRequestsToBrokers, there is no need to > eagerly construct the UpdateMetadataRequest.Builder since sometimes > updateMetadataRequestBrokerSet is actually empty. In those cases, we should > defer the construction to the time when we really need them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6550) UpdateMetadataRequest should be lazily created
huxihx created KAFKA-6550: - Summary: UpdateMetadataRequest should be lazily created Key: KAFKA-6550 URL: https://issues.apache.org/jira/browse/KAFKA-6550 Project: Kafka Issue Type: Improvement Components: controller Affects Versions: 1.0.0 Reporter: huxihx Assignee: huxihx In ControllerBrokerRequestBatch.sendRequestsToBrokers, there is no need to eagerly construct the UpdateMetadataRequest.Builder since sometimes updateMetadataRequestBrokerSet is actually empty. In those cases, we should defer the construction to the time when we really need them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6548) Migrate committed offsets from ZooKeeper to Kafka
[ https://issues.apache.org/jira/browse/KAFKA-6548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sönke Liebau resolved KAFKA-6548. - Resolution: Not A Problem Hi [~ppmanikandan...@gmail.com] it sounds like in principle you know what you want to do and are on the right track. I don't think that this should be filed in jira, but is rather something that would be well placed on the kafka users mailing list or stackoverflow. Actually I just found that you also posted this to [stackoverflow|https://stackoverflow.com/questions/48696705/migrate-zookeeper-offset-details-to-kafka] and received an answer there, so I'll close this issue as I think further discussion is better placed on SO. > Migrate committed offsets from ZooKeeper to Kafka > - > > Key: KAFKA-6548 > URL: https://issues.apache.org/jira/browse/KAFKA-6548 > Project: Kafka > Issue Type: Improvement > Components: offset manager >Affects Versions: 0.10.0.0 > Environment: Windows >Reporter: Manikandan P >Priority: Minor > > We were using previous version of Kafka(0.8.X) where all the offset details > were stored in ZooKeeper. > Now we moved to new version of Kafka(0.10.X) where all the Topic offset > details are stored in Kafka itself. > We have to move all the Topic offset details to ZooKeeper to Kafka for > existing application in Production. > Kafka is installed in Windows machine. we can't run kafka-consumer-groups.sh > from windows. > Please advice how to migrate committed offsets from ZooKeeper to Kafka. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6549) Deadlock while processing Controller Events
[ https://issues.apache.org/jira/browse/KAFKA-6549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar updated KAFKA-6549: - Summary: Deadlock while processing Controller Events (was: Deadlock in ZookeeperClient while processing Controller Events) > Deadlock while processing Controller Events > --- > > Key: KAFKA-6549 > URL: https://issues.apache.org/jira/browse/KAFKA-6549 > Project: Kafka > Issue Type: Bug >Reporter: Manikumar >Assignee: Manikumar >Priority: Major > Attachments: td.txt > > > Stack traces from a single node test cluster that was deadlocked while > processing controller Reelect and Expire events. Attached stack-trace. > {quote} > "main-EventThread" #18 daemon prio=5 os_prio=31 tid=0x7f83e4285800 > nid=0x7d03 waiting on condition [0x7278b000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0007bccadf30> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at > kafka.controller.KafkaController$Expire.waitUntilProcessed(KafkaController.scala:1505) > at > kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:163) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply$mcV$sp(ZooKeeperClient.scala:365) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) > at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258) > at > kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process(ZooKeeperClient.scala:363) > at > org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:531) > at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:506) > Locked ownable synchronizers: > - <0x000780054860> (a > java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) > > "controller-event-thread" #42 prio=5 os_prio=31 tid=0x7f83e4293800 > nid=0xad03 waiting on condition [0x73fd3000] > java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x0007bcc584a0> (a > java.util.concurrent.CountDownLatch$Sync) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) > at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:148) > at > kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1439) > at > kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1432) > at > kafka.zk.KafkaZkClient.registerZNodeChangeHandlerAndCheckExistence(KafkaZkClient.scala:1171) > at > kafka.controller.KafkaController$Reelect$.process(KafkaController.scala:1475) > at > kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:69) > at > kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69) > at > kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69) > at
[jira] [Created] (KAFKA-6549) Deadlock in ZookeeperClient while processing Controller Events
Manikumar created KAFKA-6549: Summary: Deadlock in ZookeeperClient while processing Controller Events Key: KAFKA-6549 URL: https://issues.apache.org/jira/browse/KAFKA-6549 Project: Kafka Issue Type: Bug Reporter: Manikumar Assignee: Manikumar Attachments: td.txt Stack traces from a single node test cluster that was deadlocked while processing controller Reelect and Expire events. Attached stack-trace. {quote} "main-EventThread" #18 daemon prio=5 os_prio=31 tid=0x7f83e4285800 nid=0x7d03 waiting on condition [0x7278b000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0007bccadf30> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at kafka.controller.KafkaController$Expire.waitUntilProcessed(KafkaController.scala:1505) at kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:163) at kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365) at kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply$mcV$sp(ZooKeeperClient.scala:365) at kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363) at kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250) at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258) at kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process(ZooKeeperClient.scala:363) at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:531) at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:506) Locked ownable synchronizers: - <0x000780054860> (a java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync) "controller-event-thread" #42 prio=5 os_prio=31 tid=0x7f83e4293800 nid=0xad03 waiting on condition [0x73fd3000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x0007bcc584a0> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231) at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:148) at kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1439) at kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1432) at kafka.zk.KafkaZkClient.registerZNodeChangeHandlerAndCheckExistence(KafkaZkClient.scala:1171) at kafka.controller.KafkaController$Reelect$.process(KafkaController.scala:1475) at kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:69) at kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69) at kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69) at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31) at kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:68) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82) "kafka-shutdown-hook" #14 prio=5 os_prio=31 tid=0x7f83e29b1000 nid=0x560f waiting on condition [0x75208000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x000780054860> (a
[jira] [Commented] (KAFKA-3438) Rack Aware Replica Reassignment should warn of overloaded brokers
[ https://issues.apache.org/jira/browse/KAFKA-3438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358216#comment-16358216 ] Andy Coates commented on KAFKA-3438: Morning. Any reason why you’re email thing to me? On 8 Feb 2018, at 14:22, Damian Guy (JIRA)wrote: [ https://issues.apache.org/jira/browse/KAFKA-3438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Damian Guy updated KAFKA-3438: -- Fix Version/s: (was: 1.1.0) 2.0.0 Rack Aware Replica Reassignment should warn of overloaded brokers - Key: KAFKA-3438 URL: https://issues.apache.org/jira/browse/KAFKA-3438 Project: Kafka Issue Type: Improvement Affects Versions: 0.10.0.0 Reporter: Ben Stopford Assignee: Vahid Hashemian Priority: Major Fix For: 2.0.0 We've changed the replica reassignment code to be rack aware. One problem that might catch users out would be that they rebalance the cluster using kafka-reassign-partitions.sh but their rack configuration means that some high proportion of replicas are pushed onto a single, or small number of, brokers. This should be an easy problem to avoid, by changing the rack assignment information, but we should probably warn users if they are going to create something that is unbalanced. So imagine I have a Kafka cluster of 12 nodes spread over two racks with rack awareness enabled. If I add a 13th machine, on a new rack, and run the rebalance tool, that new machine will get ~6x as many replicas as the least loaded broker. Suggest a warning be added to the tool output when --generate is called. "The most loaded broker has 2.3x as many replicas as the the least loaded broker. This is likely due to an uneven distribution of brokers across racks. You're advised to alter the rack config so there are approximately the same number of brokers per rack" and displays the individual rack→#brokers and broker→#replicas data for the proposed move. -- This message was sent by Atlassian JIRA (v7.6.3#76005) > Rack Aware Replica Reassignment should warn of overloaded brokers > - > > Key: KAFKA-3438 > URL: https://issues.apache.org/jira/browse/KAFKA-3438 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.10.0.0 >Reporter: Ben Stopford >Assignee: Vahid Hashemian >Priority: Major > Fix For: 2.0.0 > > > We've changed the replica reassignment code to be rack aware. > One problem that might catch users out would be that they rebalance the > cluster using kafka-reassign-partitions.sh but their rack configuration means > that some high proportion of replicas are pushed onto a single, or small > number of, brokers. > This should be an easy problem to avoid, by changing the rack assignment > information, but we should probably warn users if they are going to create > something that is unbalanced. > So imagine I have a Kafka cluster of 12 nodes spread over two racks with rack > awareness enabled. If I add a 13th machine, on a new rack, and run the > rebalance tool, that new machine will get ~6x as many replicas as the least > loaded broker. > Suggest a warning be added to the tool output when --generate is called. > "The most loaded broker has 2.3x as many replicas as the the least loaded > broker. This is likely due to an uneven distribution of brokers across racks. > You're advised to alter the rack config so there are approximately the same > number of brokers per rack" and displays the individual rack→#brokers and > broker→#replicas data for the proposed move. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6446) KafkaProducer with transactionId endless waits when bootstrap server is down
[ https://issues.apache.org/jira/browse/KAFKA-6446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358171#comment-16358171 ] huxihx commented on KAFKA-6446: --- [~edosciullo] I tried this way and the fix passed on my local env. Since initTranscations is the very first method when a transaction starts, it's probably okay for us to only refine this method. However, even with this fix, the producer still failed be closed after throwing TimeoutException due to the fact that Sender thread got stuck and did not respond. I am not sure if it's fully safe to simply break from the catch clause in `maybeSendTransactionalRequest`. > KafkaProducer with transactionId endless waits when bootstrap server is down > > > Key: KAFKA-6446 > URL: https://issues.apache.org/jira/browse/KAFKA-6446 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 0.11.0.0, 1.0.0 >Reporter: Eduardo Sciullo >Priority: Critical > Attachments: Test.java > > > When bootstrap server is down, a KafkaProducer with transactionId endless > waits on initTransactions. > The timeouts don't apply to that operation: don't honor the > {{TRANSACTION_TIMEOUT_CONFIG.}} > Attached an example of my code to reproduce the scenario. > > I opened this issue as suggested by [Gary > Russell|https://stackoverflow.com/users/1240763/gary-russell] > [https://stackoverflow.com/questions/48226546/defaultkafkaproducerfactory-with-transactionidprefix-endless-waits-when-bootstra] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6446) KafkaProducer with transactionId endless waits when bootstrap server is down
[ https://issues.apache.org/jira/browse/KAFKA-6446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358168#comment-16358168 ] Eduardo Sciullo commented on KAFKA-6446: The same behavior appears for commitTransaction, abortTransaction, etc.. methods of KafkaProducer (don't honor the {{TRANSACTION_TIMEOUT_CONFIG).}} [~huxi_2b] I think that replace await() with the timed version is a good idea. > KafkaProducer with transactionId endless waits when bootstrap server is down > > > Key: KAFKA-6446 > URL: https://issues.apache.org/jira/browse/KAFKA-6446 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 0.11.0.0, 1.0.0 >Reporter: Eduardo Sciullo >Priority: Critical > Attachments: Test.java > > > When bootstrap server is down, a KafkaProducer with transactionId endless > waits on initTransactions. > The timeouts don't apply to that operation: don't honor the > {{TRANSACTION_TIMEOUT_CONFIG.}} > Attached an example of my code to reproduce the scenario. > > I opened this issue as suggested by [Gary > Russell|https://stackoverflow.com/users/1240763/gary-russell] > [https://stackoverflow.com/questions/48226546/defaultkafkaproducerfactory-with-transactionidprefix-endless-waits-when-bootstra] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6446) KafkaProducer with transactionId endless waits when bootstrap server is down
[ https://issues.apache.org/jira/browse/KAFKA-6446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358163#comment-16358163 ] huxihx commented on KAFKA-6446: --- Sounds like it's easy to replace await() with the timed version. However, there is another serious problem in this case in which Sender thread got stuck in the infinite loop in `maybeSendTransactionalRequest` which impedes the thread to be closed. [~becket_qin] Does it make sense? > KafkaProducer with transactionId endless waits when bootstrap server is down > > > Key: KAFKA-6446 > URL: https://issues.apache.org/jira/browse/KAFKA-6446 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 0.11.0.0, 1.0.0 >Reporter: Eduardo Sciullo >Priority: Critical > Attachments: Test.java > > > When bootstrap server is down, a KafkaProducer with transactionId endless > waits on initTransactions. > The timeouts don't apply to that operation: don't honor the > {{TRANSACTION_TIMEOUT_CONFIG.}} > Attached an example of my code to reproduce the scenario. > > I opened this issue as suggested by [Gary > Russell|https://stackoverflow.com/users/1240763/gary-russell] > [https://stackoverflow.com/questions/48226546/defaultkafkaproducerfactory-with-transactionidprefix-endless-waits-when-bootstra] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)