[jira] [Commented] (KAFKA-6106) Postpone normal processing of tasks within a thread until restoration of all tasks have completed

2018-02-09 Thread Kamal Chandraprakash (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16359245#comment-16359245
 ] 

Kamal Chandraprakash commented on KAFKA-6106:
-

[~guozhang] I was busy in other tasks. I try to complete it by this weekend.

> Postpone normal processing of tasks within a thread until restoration of all 
> tasks have completed
> -
>
> Key: KAFKA-6106
> URL: https://issues.apache.org/jira/browse/KAFKA-6106
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.11.0.1, 1.0.0
>Reporter: Guozhang Wang
>Assignee: Kamal Chandraprakash
>Priority: Major
>  Labels: newbie++
>
> Let's say a stream thread hosts multiple tasks, A and B. At the very 
> beginning when A and B are assigned to the thread, the thread state is 
> {{TASKS_ASSIGNED}}, and the thread start restoring these two tasks during 
> this state using the restore consumer while using normal consumer for 
> heartbeating.
> If task A's restoration has completed earlier than task B, then the thread 
> will start processing A immediately even when it is still in the 
> {{TASKS_ASSIGNED}} phase. But processing task A will slow down restoration of 
> task B since it is single-thread. So the thread's transition to {{RUNNING}} 
> when all of its assigned tasks have completed restoring and now can be 
> processed will be delayed.
> Note that the streams instance's state will only transit to {{RUNNING}} when 
> all of its threads have transit to {{RUNNING}}, so the instance's transition 
> will also be delayed by this scenario.
> We'd better to not start processing ready tasks immediately, but instead 
> focus on restoration during the {{TASKS_ASSIGNED}} state to shorten the 
> overall time of the instance's state transition.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6529) Broker leaks memory and file descriptors after sudden client disconnects

2018-02-09 Thread Jeff Widman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6529?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Widman updated KAFKA-6529:
---
Fix Version/s: (was: 1.0.2)
   1.0.1

> Broker leaks memory and file descriptors after sudden client disconnects
> 
>
> Key: KAFKA-6529
> URL: https://issues.apache.org/jira/browse/KAFKA-6529
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Affects Versions: 1.0.0, 0.11.0.2
>Reporter: Graham Campbell
>Priority: Major
> Fix For: 1.1.0, 1.0.1, 0.11.0.3
>
>
> If a producer forcefully disconnects from a broker while it has staged 
> receives, that connection enters a limbo state where it is no longer 
> processed by the SocketServer.Processor, leaking the file descriptor for the 
> socket and the memory used for the staged recieve queue for that connection.
> We noticed this during an upgrade from 0.9.0.2 to 0.11.0.2. Immediately after 
> the rolling restart to upgrade, open file descriptors on the brokers started 
> climbing uncontrollably. In a few cases brokers reached our configured max 
> open files limit of 100k and crashed before we rolled back.
> We tracked this down to a buildup of muted connections in the 
> Selector.closingChannels list. If a client disconnects from the broker with 
> multiple pending produce requests, when the broker attempts to send an ack to 
> the client it recieves an IOException because the TCP socket has been closed. 
> This triggers the Selector to close the channel, but because it still has 
> pending requests, it adds it to Selector.closingChannels to process those 
> requests. However, because that exception was triggered by trying to send a 
> response, the SocketServer.Processor has marked the channel as muted and will 
> no longer process it at all.
> *Reproduced by:*
> Starting a Kafka broker/cluster
> Client produces several messages and then disconnects abruptly (eg. 
> _./rdkafka_performance -P -x 100 -b broker:9092 -t test_topic_)
> Broker then leaks file descriptor previously used for TCP socket and memory 
> for unprocessed messages
> *Proposed solution (which we've implemented internally)*
> Whenever an exception is encountered when writing to a socket in 
> Selector.pollSelectionKeys(...) record that that connection failed a send by 
> adding the KafkaChannel ID to Selector.failedSends. Then re-raise the 
> exception to still trigger the socket disconnection logic. Since every 
> exception raised in this function triggers a disconnect, we also treat any 
> exception while writing to the socket as a failed send.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2967) Move Kafka documentation to ReStructuredText

2018-02-09 Thread Bill Bejeck (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16359020#comment-16359020
 ] 

Bill Bejeck commented on KAFKA-2967:


I don't know why, but I had the impression that Asciidoc was not an option for 
us.  I've just spent a fair amount of time writing in Asciidoc and given the 
choice between Asciidoc and RST I'd prefer Asciidoc. 

 

Just my  2 cents.

> Move Kafka documentation to ReStructuredText
> 
>
> Key: KAFKA-2967
> URL: https://issues.apache.org/jira/browse/KAFKA-2967
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Gwen Shapira
>Priority: Major
>
> Storing documentation as HTML is kind of BS :)
> * Formatting is a pain, and making it look good is even worse
> * Its just HTML, can't generate PDFs
> * Reading and editting is painful
> * Validating changes is hard because our formatting relies on all kinds of 
> Apache Server features.
> I suggest:
> * Move to RST
> * Generate HTML and PDF during build using Sphinx plugin for Gradle.
> Lots of Apache projects are doing this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6502) Kafka streams deserialization handler not committing offsets on error records

2018-02-09 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358884#comment-16358884
 ] 

Guozhang Wang commented on KAFKA-6502:
--

Got it, thanks! I think this is a valid point, we will keep it on track for 
known issues and consider a fix based on priorities. 

> Kafka streams deserialization handler not committing offsets on error records
> -
>
> Key: KAFKA-6502
> URL: https://issues.apache.org/jira/browse/KAFKA-6502
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Soby Chacko
>Priority: Minor
>
> See this StackOverflow issue: 
> [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler]
> and this comment: 
> [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899]
>  I am trying to use the LogAndContinueExceptionHandler on deserialization. It 
> works fine when an error occurs by successfully logging and continuing. 
> However, on a continuous stream of errors, it seems like these messages are 
> not committed and on a restart of the application they reappear again.  It is 
> more problematic if I try to send the messages in error to a DLQ. On a 
> restart, they are sent again to DLQ. As soon as I have a good record coming 
> in, it looks like the offset moves further and not seeing the already logged 
> messages again after a restart. 
> I reproduced this behavior by running the sample provided here: 
> [https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java]
> I changed the incoming value Serde to 
> {{Serdes.Integer().getClass().getName()}} to force a deserialization error on 
> input and reduced the commit interval to just 1 second. Also added the 
> following to the config.
> {{streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
>  LogAndContinueExceptionHandler.class);}}.
>  It looks like when deserialization exceptions occur, this flag is never set 
> to be true here: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228].
>  It only becomes true once processing succeeds. That might be the reason why 
> commit is not happening even after I manually call processorContext#commit().



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6549) Deadlock while processing Controller Events

2018-02-09 Thread Manikumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358859#comment-16358859
 ] 

Manikumar commented on KAFKA-6549:
--

This is not exactly deadlock. ZooKeeperClient requests are getting locked 
during zk disconnect scenario. Due to this some of the threads are waiting 
forever blocking shutdown thread. I am still investigating. Will update more 
details. 

> Deadlock while processing Controller Events
> ---
>
> Key: KAFKA-6549
> URL: https://issues.apache.org/jira/browse/KAFKA-6549
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Manikumar
>Assignee: Manikumar
>Priority: Blocker
> Fix For: 1.1.0
>
> Attachments: td.txt
>
>
> Stack traces from a single node test cluster that was deadlocked while 
> processing controller Reelect and Expire events. Attached stack-trace.
> {quote}
> "main-EventThread" #18 daemon prio=5 os_prio=31 tid=0x7f83e4285800 
> nid=0x7d03 waiting on condition [0x7278b000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0007bccadf30> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessed(KafkaController.scala:1505)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:163)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply$mcV$sp(ZooKeeperClient.scala:365)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>  at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process(ZooKeeperClient.scala:363)
>  at 
> org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:531)
>  at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:506)
> Locked ownable synchronizers:
>  - <0x000780054860> (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
>  
> "controller-event-thread" #42 prio=5 os_prio=31 tid=0x7f83e4293800 
> nid=0xad03 waiting on condition [0x73fd3000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0007bcc584a0> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>  at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:148)
>  at 
> kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1439)
>  at 
> kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1432)
>  at 
> kafka.zk.KafkaZkClient.registerZNodeChangeHandlerAndCheckExistence(KafkaZkClient.scala:1171)
>  at 
> kafka.controller.KafkaController$Reelect$.process(KafkaController.scala:1475)
>  at 
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:69)
>  at 
> 

[jira] [Commented] (KAFKA-4651) Improve test coverage of Stores

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358817#comment-16358817
 ] 

ASF GitHub Bot commented on KAFKA-4651:
---

bbejeck opened a new pull request #4555: KAFKA-4651: [WIP] improve test 
coverage of stores
URL: https://github.com/apache/kafka/pull/4555
 
 
   Working on increasing the coverage of stores in unit tests.  
   Started with `InMemoryKeyValueLoggedStore` 
   ![screen shot 2018-02-09 at 1 15 11 
pm](https://user-images.githubusercontent.com/199238/36044796-c7ddf31e-0da1-11e8-87a5-1d6727a5fe4d.png)
   ![screen shot 2018-02-09 at 1 24 38 
pm](https://user-images.githubusercontent.com/199238/36044805-cd1bc23e-0da1-11e8-9b57-8f8bfa0a21ea.png)
   
   
   
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve test coverage of Stores
> ---
>
> Key: KAFKA-4651
> URL: https://issues.apache.org/jira/browse/KAFKA-4651
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Damian Guy
>Priority: Minor
>  Labels: newbie
>
> Some factory methods aren't tested



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4879) KafkaConsumer.position may hang forever when deleting a topic

2018-02-09 Thread Damian Guy (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-4879:
--
Fix Version/s: (was: 1.1.0)
   2.0.0

> KafkaConsumer.position may hang forever when deleting a topic
> -
>
> Key: KAFKA-4879
> URL: https://issues.apache.org/jira/browse/KAFKA-4879
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 0.10.2.0
>Reporter: Shixiong Zhu
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.0.0
>
>
> KafkaConsumer.position may hang forever when deleting a topic. The problem is 
> this line 
> https://github.com/apache/kafka/blob/022bf129518e33e165f9ceefc4ab9e622952d3bd/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L374
> The timeout is "Long.MAX_VALUE", and it will just retry forever for 
> UnknownTopicOrPartitionException.
> Here is a reproducer
> {code}
> import org.apache.kafka.clients.consumer.KafkaConsumer;
> import org.apache.kafka.common.TopicPartition;
> import org.apache.kafka.common.serialization.StringDeserializer;
> import java.util.Collections;
> import java.util.Properties;
> import java.util.Set;
> public class KafkaReproducer {
>   public static void main(String[] args) {
> // Make sure "delete.topic.enable" is set to true.
> // Please create the topic test with "3" partitions manually.
> // The issue is gone when there is only one partition.
> String topic = "test";
> Properties props = new Properties();
> props.put("bootstrap.servers", "localhost:9092");
> props.put("group.id", "testgroup");
> props.put("value.deserializer", StringDeserializer.class.getName());
> props.put("key.deserializer", StringDeserializer.class.getName());
> props.put("enable.auto.commit", "false");
> KafkaConsumer kc = new KafkaConsumer(props);
> kc.subscribe(Collections.singletonList(topic));
> kc.poll(0);
> Set partitions = kc.assignment();
> System.out.println("partitions: " + partitions);
> kc.pause(partitions);
> kc.seekToEnd(partitions);
> System.out.println("please delete the topic in 30 seconds");
> try {
>   // Sleep 30 seconds to give us enough time to delete the topic.
>   Thread.sleep(3);
> } catch (InterruptedException e) {
>   e.printStackTrace();
> }
> System.out.println("sleep end");
> for (TopicPartition p : partitions) {
>   System.out.println(p + " offset: " + kc.position(p));
> }
> System.out.println("cannot reach here");
> kc.close();
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5214) Re-add KafkaAdminClient#apiVersions

2018-02-09 Thread Colin P. McCabe (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin P. McCabe updated KAFKA-5214:
---
 Priority: Minor  (was: Blocker)
Fix Version/s: (was: 1.1.0)
   1.2.0
   Issue Type: Improvement  (was: Bug)

> Re-add KafkaAdminClient#apiVersions
> ---
>
> Key: KAFKA-5214
> URL: https://issues.apache.org/jira/browse/KAFKA-5214
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>Priority: Minor
> Fix For: 1.2.0
>
>
> We removed KafkaAdminClient#apiVersions just before 0.11.0.0 to give us a bit 
> more time to iterate on it before it's included in a release. We should add 
> the relevant methods back.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6549) Deadlock while processing Controller Events

2018-02-09 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358729#comment-16358729
 ] 

Ted Yu commented on KAFKA-6549:
---

kafka-shutdown-hook didn't hold onto any lock.

Can you tell me how the deadlock is formed ?

bq.  waiting on condition [0x7278b000]

The above (0x7278b000) only appeared once in the dump.

> Deadlock while processing Controller Events
> ---
>
> Key: KAFKA-6549
> URL: https://issues.apache.org/jira/browse/KAFKA-6549
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Manikumar
>Assignee: Manikumar
>Priority: Blocker
> Fix For: 1.1.0
>
> Attachments: td.txt
>
>
> Stack traces from a single node test cluster that was deadlocked while 
> processing controller Reelect and Expire events. Attached stack-trace.
> {quote}
> "main-EventThread" #18 daemon prio=5 os_prio=31 tid=0x7f83e4285800 
> nid=0x7d03 waiting on condition [0x7278b000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0007bccadf30> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessed(KafkaController.scala:1505)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:163)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply$mcV$sp(ZooKeeperClient.scala:365)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>  at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process(ZooKeeperClient.scala:363)
>  at 
> org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:531)
>  at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:506)
> Locked ownable synchronizers:
>  - <0x000780054860> (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
>  
> "controller-event-thread" #42 prio=5 os_prio=31 tid=0x7f83e4293800 
> nid=0xad03 waiting on condition [0x73fd3000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0007bcc584a0> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>  at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:148)
>  at 
> kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1439)
>  at 
> kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1432)
>  at 
> kafka.zk.KafkaZkClient.registerZNodeChangeHandlerAndCheckExistence(KafkaZkClient.scala:1171)
>  at 
> kafka.controller.KafkaController$Reelect$.process(KafkaController.scala:1475)
>  at 
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:69)
>  at 
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69)
>  at 
> 

[jira] [Resolved] (KAFKA-6407) Sink task metrics are the same for all connectors

2018-02-09 Thread Robert Yokota (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Robert Yokota resolved KAFKA-6407.
--
Resolution: Duplicate

> Sink task metrics are the same for all connectors
> -
>
> Key: KAFKA-6407
> URL: https://issues.apache.org/jira/browse/KAFKA-6407
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Alexander Koval
>Priority: Minor
>
> I have a lot of sink connectors inside a distributed worker. When I tried to 
> save metrics to graphite I discovered all task metrics are identical.
> {code}
> $>get -b 
> kafka.connect:type=sink-task-metrics,connector=prom-by-catalog-company,task=0 
> sink-record-read-total
> #mbean = 
> kafka.connect:type=sink-task-metrics,connector=prom-by-catalog-company,task=0:
> sink-record-read-total = 228744.0;
> $>get -b 
> kafka.connect:type=sink-task-metrics,connector=prom-kz-catalog-product,task=0 
> sink-record-read-total
> #mbean = 
> kafka.connect:type=sink-task-metrics,connector=prom-kz-catalog-product,task=0:
> sink-record-read-total = 228744.0;
> $>get -b 
> kafka.connect:type=sink-task-metrics,connector=prom-ru-catalog-company,task=0 
> sink-record-read-total
> #mbean = 
> kafka.connect:type=sink-task-metrics,connector=prom-ru-catalog-company,task=0:
> sink-record-read-total = 228744.0;
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5575) SchemaBuilder should have a method to clone an existing Schema.

2018-02-09 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5575?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-5575:
-
Labels: needs-kip  (was: )

> SchemaBuilder should have a method to clone an existing Schema.
> ---
>
> Key: KAFKA-5575
> URL: https://issues.apache.org/jira/browse/KAFKA-5575
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Jeremy Custenborder
>Assignee: Jeremy Custenborder
>Priority: Minor
>  Labels: needs-kip
>
> Now that Transformations have landed in Kafka Connect we should have an easy 
> way to do quick modifications to schemas. For example changing the name of a 
> schema shouldn't be much more than. I should be able to do more stuff like 
> this.
> {code:java}
> return SchemaBuilder.from(Schema.STRING_SCHEMA).name("MyNewName").build()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6407) Sink task metrics are the same for all connectors

2018-02-09 Thread Robert Yokota (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358714#comment-16358714
 ] 

Robert Yokota commented on KAFKA-6407:
--

Yes, I believe so.  I'll resolve it as duplicate.

> Sink task metrics are the same for all connectors
> -
>
> Key: KAFKA-6407
> URL: https://issues.apache.org/jira/browse/KAFKA-6407
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Alexander Koval
>Priority: Minor
>
> I have a lot of sink connectors inside a distributed worker. When I tried to 
> save metrics to graphite I discovered all task metrics are identical.
> {code}
> $>get -b 
> kafka.connect:type=sink-task-metrics,connector=prom-by-catalog-company,task=0 
> sink-record-read-total
> #mbean = 
> kafka.connect:type=sink-task-metrics,connector=prom-by-catalog-company,task=0:
> sink-record-read-total = 228744.0;
> $>get -b 
> kafka.connect:type=sink-task-metrics,connector=prom-kz-catalog-product,task=0 
> sink-record-read-total
> #mbean = 
> kafka.connect:type=sink-task-metrics,connector=prom-kz-catalog-product,task=0:
> sink-record-read-total = 228744.0;
> $>get -b 
> kafka.connect:type=sink-task-metrics,connector=prom-ru-catalog-company,task=0 
> sink-record-read-total
> #mbean = 
> kafka.connect:type=sink-task-metrics,connector=prom-ru-catalog-company,task=0:
> sink-record-read-total = 228744.0;
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-5575) SchemaBuilder should have a method to clone an existing Schema.

2018-02-09 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358653#comment-16358653
 ] 

Randall Hauch edited comment on KAFKA-5575 at 2/9/18 5:13 PM:
--

-[~ewencp], is this an example of something that we would not need a KIP for 
because it is a simple addition, or should we just do a KIP for everything?-

Saw that [~ewencp] already wrote up a KIP for this: 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=73635931


was (Author: rhauch):
[~ewencp], is this an example of something that we would not need a KIP for 
because it is a simple addition, or should we just do a KIP for everything?

> SchemaBuilder should have a method to clone an existing Schema.
> ---
>
> Key: KAFKA-5575
> URL: https://issues.apache.org/jira/browse/KAFKA-5575
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Jeremy Custenborder
>Assignee: Jeremy Custenborder
>Priority: Minor
>  Labels: needs-kip
>
> Now that Transformations have landed in Kafka Connect we should have an easy 
> way to do quick modifications to schemas. For example changing the name of a 
> schema shouldn't be much more than. I should be able to do more stuff like 
> this.
> {code:java}
> return SchemaBuilder.from(Schema.STRING_SCHEMA).name("MyNewName").build()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-5896) Kafka Connect task threads never interrupted

2018-02-09 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358670#comment-16358670
 ] 

Randall Hauch edited comment on KAFKA-5896 at 2/9/18 4:59 PM:
--

[~ewencp], do you have any thoughts on this? I know in the past you've talked 
about not wanting to do this since some developers won't properly implement the 
interruption. I agree that not everyone implements it correctly, but we could 
at least _try_ to cancel the tasks. And this latest PR seems like a good 
approach.


was (Author: rhauch):
[~ewencp], do you have any thoughts on this? I know in the past you've talked 
about not wanting to do this since some developers won't properly implement the 
interruption. I agree that not everyone implements it correctly, but we could 
at least _try_ to cancel the tasks. 

> Kafka Connect task threads never interrupted
> 
>
> Key: KAFKA-5896
> URL: https://issues.apache.org/jira/browse/KAFKA-5896
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Nick Pillitteri
>Assignee: Nick Pillitteri
>Priority: Minor
>
> h2. Problem
> Kafka Connect tasks associated with connectors are run in their own threads. 
> When tasks are stopped or restarted, a flag is set - {{stopping}} - to 
> indicate the task should stop processing records. However, if the thread the 
> task is running in is blocked (waiting for a lock or performing I/O) it's 
> possible the task will never stop.
> I've created a connector specifically to demonstrate this issue (along with 
> some more detailed instructions for reproducing the issue): 
> https://github.com/smarter-travel-media/hang-connector
> I believe this is an issue because it means that a single badly behaved 
> connector (any connector that does I/O without timeouts) can cause the Kafka 
> Connect worker to get into a state where the only solution is to restart the 
> JVM.
> I think, but couldn't reproduce, that this is the cause of this problem on 
> Stack Overflow: 
> https://stackoverflow.com/questions/43802156/inconsistent-connector-state-connectexception-task-already-exists-in-this-work
> h2. Expected Result
> I would expect the Worker to eventually interrupt the thread that the task is 
> running in. In the past across various other libraries, this is what I've 
> seen done when a thread needs to be forcibly stopped.
> h2. Actual Result
> In actuality, the Worker sets a {{stopping}} flag and lets the thread run 
> indefinitely. It uses a timeout while waiting for the task to stop but after 
> this timeout has expired it simply sets a {{cancelled}} flag. This means that 
> every time a task is restarted, a new thread running the task will be 
> created. Thus a task may end up with multiple instances all running in their 
> own threads when there's only supposed to be a single thread.
> h2. Steps to Reproduce
> The problem can be replicated by using the connector available here: 
> https://github.com/smarter-travel-media/hang-connector
> Apologies for how involved the steps are.
> I've created a patch that forcibly interrupts threads after they fail to 
> gracefully shutdown here: 
> https://github.com/smarter-travel-media/kafka/commit/295c747a9fd82ee8b30556c89c31e0bfcce5a2c5
> I've confirmed that this fixes the issue. I can add some unit tests and 
> submit a PR if people agree that this is a bug and interrupting threads is 
> the right fix.
> Thanks!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5896) Kafka Connect task threads never interrupted

2018-02-09 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358670#comment-16358670
 ] 

Randall Hauch commented on KAFKA-5896:
--

[~ewencp], do you have any thoughts on this? I know in the past you've talked 
about not wanting to do this since some developers won't properly implement the 
interruption. I agree that not everyone implements it correctly, but we could 
at least _try_ to cancel the tasks. 

> Kafka Connect task threads never interrupted
> 
>
> Key: KAFKA-5896
> URL: https://issues.apache.org/jira/browse/KAFKA-5896
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Nick Pillitteri
>Assignee: Nick Pillitteri
>Priority: Minor
>
> h2. Problem
> Kafka Connect tasks associated with connectors are run in their own threads. 
> When tasks are stopped or restarted, a flag is set - {{stopping}} - to 
> indicate the task should stop processing records. However, if the thread the 
> task is running in is blocked (waiting for a lock or performing I/O) it's 
> possible the task will never stop.
> I've created a connector specifically to demonstrate this issue (along with 
> some more detailed instructions for reproducing the issue): 
> https://github.com/smarter-travel-media/hang-connector
> I believe this is an issue because it means that a single badly behaved 
> connector (any connector that does I/O without timeouts) can cause the Kafka 
> Connect worker to get into a state where the only solution is to restart the 
> JVM.
> I think, but couldn't reproduce, that this is the cause of this problem on 
> Stack Overflow: 
> https://stackoverflow.com/questions/43802156/inconsistent-connector-state-connectexception-task-already-exists-in-this-work
> h2. Expected Result
> I would expect the Worker to eventually interrupt the thread that the task is 
> running in. In the past across various other libraries, this is what I've 
> seen done when a thread needs to be forcibly stopped.
> h2. Actual Result
> In actuality, the Worker sets a {{stopping}} flag and lets the thread run 
> indefinitely. It uses a timeout while waiting for the task to stop but after 
> this timeout has expired it simply sets a {{cancelled}} flag. This means that 
> every time a task is restarted, a new thread running the task will be 
> created. Thus a task may end up with multiple instances all running in their 
> own threads when there's only supposed to be a single thread.
> h2. Steps to Reproduce
> The problem can be replicated by using the connector available here: 
> https://github.com/smarter-travel-media/hang-connector
> Apologies for how involved the steps are.
> I've created a patch that forcibly interrupts threads after they fail to 
> gracefully shutdown here: 
> https://github.com/smarter-travel-media/kafka/commit/295c747a9fd82ee8b30556c89c31e0bfcce5a2c5
> I've confirmed that this fixes the issue. I can add some unit tests and 
> submit a PR if people agree that this is a bug and interrupting threads is 
> the right fix.
> Thanks!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5575) SchemaBuilder should have a method to clone an existing Schema.

2018-02-09 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358653#comment-16358653
 ] 

Randall Hauch commented on KAFKA-5575:
--

[~ewencp], is this an example of something that we would not need a KIP for 
because it is a simple addition, or should we just do a KIP for everything?

> SchemaBuilder should have a method to clone an existing Schema.
> ---
>
> Key: KAFKA-5575
> URL: https://issues.apache.org/jira/browse/KAFKA-5575
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Jeremy Custenborder
>Assignee: Jeremy Custenborder
>Priority: Minor
>
> Now that Transformations have landed in Kafka Connect we should have an easy 
> way to do quick modifications to schemas. For example changing the name of a 
> schema shouldn't be much more than. I should be able to do more stuff like 
> this.
> {code:java}
> return SchemaBuilder.from(Schema.STRING_SCHEMA).name("MyNewName").build()
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-4159) Allow overriding producer & consumer properties at the connector level

2018-02-09 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-4159:
-
Labels: needs-kip  (was: )

> Allow overriding producer & consumer properties at the connector level
> --
>
> Key: KAFKA-4159
> URL: https://issues.apache.org/jira/browse/KAFKA-4159
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Shikhar Bhushan
>Assignee: Stephen Durfey
>Priority: Major
>  Labels: needs-kip
>
> As an example use cases, overriding a sink connector's consumer's partition 
> assignment strategy.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6502) Kafka streams deserialization handler not committing offsets on error records

2018-02-09 Thread Soby Chacko (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358638#comment-16358638
 ] 

Soby Chacko commented on KAFKA-6502:


Hi Guozhang, your reasonings are correct. However, if I introduce a custom DLQ 
type of exception handler (in which case, if I have a bad Message, it is sent 
to an error-topic), then if I happen to re-start the application, I don't want 
the poison pills to be re-sent to the DLQ as they have already been sent. 
Anyways, this is a corner case I ran into and may not happen frequently. Thanks!

> Kafka streams deserialization handler not committing offsets on error records
> -
>
> Key: KAFKA-6502
> URL: https://issues.apache.org/jira/browse/KAFKA-6502
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Soby Chacko
>Priority: Minor
>
> See this StackOverflow issue: 
> [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler]
> and this comment: 
> [https://stackoverflow.com/questions/48470899/kafka-streams-deserialization-handler#comment84018564_48470899]
>  I am trying to use the LogAndContinueExceptionHandler on deserialization. It 
> works fine when an error occurs by successfully logging and continuing. 
> However, on a continuous stream of errors, it seems like these messages are 
> not committed and on a restart of the application they reappear again.  It is 
> more problematic if I try to send the messages in error to a DLQ. On a 
> restart, they are sent again to DLQ. As soon as I have a good record coming 
> in, it looks like the offset moves further and not seeing the already logged 
> messages again after a restart. 
> I reproduced this behavior by running the sample provided here: 
> [https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/WordCountLambdaExample.java]
> I changed the incoming value Serde to 
> {{Serdes.Integer().getClass().getName()}} to force a deserialization error on 
> input and reduced the commit interval to just 1 second. Also added the 
> following to the config.
> {{streamsConfiguration.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
>  LogAndContinueExceptionHandler.class);}}.
>  It looks like when deserialization exceptions occur, this flag is never set 
> to be true here: 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L228].
>  It only becomes true once processing succeeds. That might be the reason why 
> commit is not happening even after I manually call processorContext#commit().



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6208) Reduce startup time for Kafka Connect workers

2018-02-09 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358637#comment-16358637
 ] 

Randall Hauch commented on KAFKA-6208:
--

[~Satyajit], it might be possible to start scanning asynchronously, but then 
whenever code needs a plugin it would have to block. Doable, but that's logged 
under KAFKA-5451 (now linked).

> Reduce startup time for Kafka Connect workers
> -
>
> Key: KAFKA-6208
> URL: https://issues.apache.org/jira/browse/KAFKA-6208
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Randall Hauch
>Priority: Major
>
> Kafka Connect startup times are excessive with a handful of connectors on the 
> plugin path or classpath. We should not be scanning three times (once for 
> connectors, once for SMTs, and once for converters), and hopefully we can 
> avoid scanning directories that are clearly not plugin directories. 
> We should also consider using Java's Service Loader to quickly identify 
> connectors. The latter would require a KIP and would require time to for 
> connectors to migrate, but we could be smarter about only scanning plugin 
> directories that need to be scanned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6407) Sink task metrics are the same for all connectors

2018-02-09 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6407?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358619#comment-16358619
 ] 

Randall Hauch commented on KAFKA-6407:
--

[~rayokota], does this sound like a duplicate of KAFKA-6504?

> Sink task metrics are the same for all connectors
> -
>
> Key: KAFKA-6407
> URL: https://issues.apache.org/jira/browse/KAFKA-6407
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Alexander Koval
>Priority: Minor
>
> I have a lot of sink connectors inside a distributed worker. When I tried to 
> save metrics to graphite I discovered all task metrics are identical.
> {code}
> $>get -b 
> kafka.connect:type=sink-task-metrics,connector=prom-by-catalog-company,task=0 
> sink-record-read-total
> #mbean = 
> kafka.connect:type=sink-task-metrics,connector=prom-by-catalog-company,task=0:
> sink-record-read-total = 228744.0;
> $>get -b 
> kafka.connect:type=sink-task-metrics,connector=prom-kz-catalog-product,task=0 
> sink-record-read-total
> #mbean = 
> kafka.connect:type=sink-task-metrics,connector=prom-kz-catalog-product,task=0:
> sink-record-read-total = 228744.0;
> $>get -b 
> kafka.connect:type=sink-task-metrics,connector=prom-ru-catalog-company,task=0 
> sink-record-read-total
> #mbean = 
> kafka.connect:type=sink-task-metrics,connector=prom-ru-catalog-company,task=0:
> sink-record-read-total = 228744.0;
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-4632) Kafka Connect WorkerSinkTask.closePartitions doesn't handle WakeupException

2018-02-09 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-4632.
--
   Resolution: Fixed
Fix Version/s: 0.10.0.1
   0.10.1.0

I'm going to close this as fixed in 0.10.0.1. [~ScottReynolds], if you 
disagree, please feel free to reopen with more detail.

> Kafka Connect WorkerSinkTask.closePartitions doesn't handle WakeupException
> ---
>
> Key: KAFKA-4632
> URL: https://issues.apache.org/jira/browse/KAFKA-4632
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0
>Reporter: Scott Reynolds
>Priority: Major
> Fix For: 0.10.1.0, 0.10.0.1
>
>
> WorkerSinkTask's closePartitions method isn't handling WakeupException that 
> can be thrown from commitSync.
> {code}
> org.apache.kafka.common.errors.WakeupException
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup
>  (ConsumerNetworkClient.java:404)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll 
> (ConsumerNetworkClient.java:245)
> at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll 
> (ConsumerNetworkClient.java:180)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync
>  (ConsumerCoordinator.java:499)
> at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync 
> (KafkaConsumer.java:1104)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitSync 
> (WorkerSinkTask.java:245)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit 
> (WorkerSinkTask.java:264)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets 
> (WorkerSinkTask.java:305)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions 
> (WorkerSinkTask.java:435)
> at org.apache.kafka.connect.runtime.WorkerSinkTask.execute 
> (WorkerSinkTask.java:147)
> at org.apache.kafka.connect.runtime.WorkerTask.doRun (WorkerTask.java:140)
> at org.apache.kafka.connect.runtime.WorkerTask.run (WorkerTask.java:175)
> at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
> at java.util.concurrent.FutureTask.run (FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker 
> (ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run 
> (ThreadPoolExecutor.java:617)
> at java.lang.Thread.run (Thread.java:745)
> {code}
> I believe it should catch it and ignore it as that is what the poll method 
> does when isStopping is true
> {code:java}
> } catch (WakeupException we) {
> log.trace("{} consumer woken up", id);
> if (isStopping())
> return;
> if (shouldPause()) {
> pauseAll();
> } else if (!pausedForRedelivery) {
> resumeAll();
> }
> }
> {code}
> But unsure, love some insight into this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6208) Reduce startup time for Kafka Connect workers

2018-02-09 Thread Randall Hauch (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358600#comment-16358600
 ] 

Randall Hauch commented on KAFKA-6208:
--

[~Satyajit], I would argue that anything with the scanning approach should be 
fixed as part of KAFKA-6503, for which there is a simple fix. This issue, OTOH, 
is more about moving to a non-scanning approach, and this requires a 
longer-term phase in of the new mechanism and phase-out of the existing 
scanning mechanism.

Regarding your specific suggestion, I don't believe that loading the different 
components asynchronously would be of any benefit over the much simpler 
suggestion in KAFKA-6503, which already is going to utilize multiple threads.

> Reduce startup time for Kafka Connect workers
> -
>
> Key: KAFKA-6208
> URL: https://issues.apache.org/jira/browse/KAFKA-6208
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Randall Hauch
>Priority: Major
>
> Kafka Connect startup times are excessive with a handful of connectors on the 
> plugin path or classpath. We should not be scanning three times (once for 
> connectors, once for SMTs, and once for converters), and hopefully we can 
> avoid scanning directories that are clearly not plugin directories. 
> We should also consider using Java's Service Loader to quickly identify 
> connectors. The latter would require a KIP and would require time to for 
> connectors to migrate, but we could be smarter about only scanning plugin 
> directories that need to be scanned.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6503) Connect: Plugin scan is very slow

2018-02-09 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-6503:
-
Fix Version/s: 1.1.0

> Connect: Plugin scan is very slow
> -
>
> Key: KAFKA-6503
> URL: https://issues.apache.org/jira/browse/KAFKA-6503
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Per Steffensen
>Priority: Critical
> Fix For: 1.1.0
>
>
> Just upgraded to 1.0.0. It seems some plugin scan has been introduced. It is 
> very slow - see logs from starting my Kafka-Connect instance at the bottom. 
> It takes almost 4 minutes scanning. I am running Kafka-Connect in docker 
> based on confluentinc/cp-kafka-connect:4.0.0. I set plugin.path to 
> /usr/share/java. The only thing I have added is a 13MB jar in 
> /usr/share/java/kafka-connect-file-streamer-client containing two connectors 
> and a converter. That one alone seems to take 20 secs.
> If it was just scanning in the background, and everything was working it 
> probably would not be a big issue. But it does not. Right after starting the 
> Kafka-Connect instance I try to create a connector via the /connectors 
> endpoint, but it will not succeed before the plugin scanning has finished (4 
> minutes)
> I am not even sure why scanning is necessary. Is it not always true that 
> connectors, converters etc are mentioned by name, so to see if it exists, 
> just try to load the class - the classloader will tell if it is available. 
> Hmmm, there is probably a reason.
> Anyway, either it should be made much faster, or at least Kafka-Connect 
> should be fully functional (or as functional as possible) while scanning is 
> going on.
> {code}
> [2018-01-30 13:52:26,834] INFO Scanning for plugin classes. This might take a 
> moment ... (org.apache.kafka.connect.cli.ConnectDistributed)
> [2018-01-30 13:52:27,218] INFO Loading plugin from: 
> /usr/share/java/kafka-connect-file-streamer-client 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,037] INFO Registered loader: 
> PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-file-streamer-client/}
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,038] INFO Added plugin 
> 'com.tlt.common.files.streamer.client.kafka.connect.OneTaskPerStreamSourceConnectorManager'
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,039] INFO Added plugin 
> 'com.tlt.common.files.streamer.client.kafka.connect.OneTaskPerFilesStreamerServerSourceConnectorManager'
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,040] INFO Added plugin 
> 'com.tlt.common.files.streamer.client.kafka.connect.KafkaConnectByteArrayConverter'
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,049] INFO Loading plugin from: 
> /usr/share/java/kafka-connect-elasticsearch 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:47,595] INFO Registered loader: 
> PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-elasticsearch/}
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:47,611] INFO Added plugin 
> 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector' 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:47,651] INFO Loading plugin from: 
> /usr/share/java/kafka-connect-jdbc 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:49,491] INFO Registered loader: 
> PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-jdbc/} 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:49,491] INFO Added plugin 
> 'io.confluent.connect.jdbc.JdbcSinkConnector' 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:49,492] INFO Added plugin 
> 'io.confluent.connect.jdbc.JdbcSourceConnector' 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:49,663] INFO Loading plugin from: 
> /usr/share/java/kafka-connect-s3 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:53:51,055] INFO Registered loader: 
> PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-s3/} 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:53:51,055] INFO Added plugin 
> 'io.confluent.connect.s3.S3SinkConnector' 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:53:51,061] INFO Added plugin 
> 'io.confluent.connect.storage.tools.SchemaSourceConnector' 
> 

[jira] [Updated] (KAFKA-6503) Connect: Plugin scan is very slow

2018-02-09 Thread Randall Hauch (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-6503:
-
Issue Type: Bug  (was: Improvement)

> Connect: Plugin scan is very slow
> -
>
> Key: KAFKA-6503
> URL: https://issues.apache.org/jira/browse/KAFKA-6503
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 1.0.0
>Reporter: Per Steffensen
>Priority: Critical
> Fix For: 1.1.0
>
>
> Just upgraded to 1.0.0. It seems some plugin scan has been introduced. It is 
> very slow - see logs from starting my Kafka-Connect instance at the bottom. 
> It takes almost 4 minutes scanning. I am running Kafka-Connect in docker 
> based on confluentinc/cp-kafka-connect:4.0.0. I set plugin.path to 
> /usr/share/java. The only thing I have added is a 13MB jar in 
> /usr/share/java/kafka-connect-file-streamer-client containing two connectors 
> and a converter. That one alone seems to take 20 secs.
> If it was just scanning in the background, and everything was working it 
> probably would not be a big issue. But it does not. Right after starting the 
> Kafka-Connect instance I try to create a connector via the /connectors 
> endpoint, but it will not succeed before the plugin scanning has finished (4 
> minutes)
> I am not even sure why scanning is necessary. Is it not always true that 
> connectors, converters etc are mentioned by name, so to see if it exists, 
> just try to load the class - the classloader will tell if it is available. 
> Hmmm, there is probably a reason.
> Anyway, either it should be made much faster, or at least Kafka-Connect 
> should be fully functional (or as functional as possible) while scanning is 
> going on.
> {code}
> [2018-01-30 13:52:26,834] INFO Scanning for plugin classes. This might take a 
> moment ... (org.apache.kafka.connect.cli.ConnectDistributed)
> [2018-01-30 13:52:27,218] INFO Loading plugin from: 
> /usr/share/java/kafka-connect-file-streamer-client 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,037] INFO Registered loader: 
> PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-file-streamer-client/}
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,038] INFO Added plugin 
> 'com.tlt.common.files.streamer.client.kafka.connect.OneTaskPerStreamSourceConnectorManager'
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,039] INFO Added plugin 
> 'com.tlt.common.files.streamer.client.kafka.connect.OneTaskPerFilesStreamerServerSourceConnectorManager'
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,040] INFO Added plugin 
> 'com.tlt.common.files.streamer.client.kafka.connect.KafkaConnectByteArrayConverter'
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:43,049] INFO Loading plugin from: 
> /usr/share/java/kafka-connect-elasticsearch 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:47,595] INFO Registered loader: 
> PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-elasticsearch/}
>  (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:47,611] INFO Added plugin 
> 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector' 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:47,651] INFO Loading plugin from: 
> /usr/share/java/kafka-connect-jdbc 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:49,491] INFO Registered loader: 
> PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-jdbc/} 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:49,491] INFO Added plugin 
> 'io.confluent.connect.jdbc.JdbcSinkConnector' 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:49,492] INFO Added plugin 
> 'io.confluent.connect.jdbc.JdbcSourceConnector' 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:52:49,663] INFO Loading plugin from: 
> /usr/share/java/kafka-connect-s3 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:53:51,055] INFO Registered loader: 
> PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-connect-s3/} 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:53:51,055] INFO Added plugin 
> 'io.confluent.connect.s3.S3SinkConnector' 
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
> [2018-01-30 13:53:51,061] INFO Added plugin 
> 'io.confluent.connect.storage.tools.SchemaSourceConnector' 
> 

[jira] [Commented] (KAFKA-6547) group offset reset and begin_offset ignored/no effect

2018-02-09 Thread Dan (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358415#comment-16358415
 ] 

Dan commented on KAFKA-6547:


 

 

I think I've eliminated my client from the problem definition. I start the 
client first to get the group started, then I stop it. Describe shows:
{code:java}
kafka@kafka-server-1:~$ KAFKA_OPTS="-Xmx128M 
-Djava.security.auth.login.config=./config/kafka_client_jaas.conf" 
kafka-consumer-groups.sh --command-config ./config/consumer.properties 
--bootstrap-server kafka-server-1:9092 --group meta-printer --describe
Note: This will not show information about old Zookeeper-based consumers.
 
Consumer group 'meta-printer' has no active members.
 
TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG   
     CONSUMER-ID                                       HOST                     
      CLIENT-ID
payload-meta                   0          836464          1060485         
224021     -                                                 -                  
            -

{code}
Then I run my client for about 20s, with an offset commit frequency of 10s. 
Here is a describe after that:
{code:java}
kafka@kafka-server-1:~$ KAFKA_OPTS="-Xmx128M 
-Djava.security.auth.login.config=./config/kafka_client_jaas.conf" 
kafka-consumer-groups.sh --command-config ./config/consumer.properties 
--bootstrap-server kafka-server-1:9092 --group meta-printer --describe
Note: This will not show information about old Zookeeper-based consumers.
 
Consumer group 'meta-printer' has no active members.
 
TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG   
     CONSUMER-ID                                       HOST                     
      CLIENT-ID
payload-meta                   0          974424          1060485         86061 
     -                                                 -                        
      -
{code}
Now I'm going to try to reset the group to the earliest available:
{code:java}
kafka@kafka-server-1:~$ KAFKA_OPTS="-Xmx128M 
-Djava.security.auth.login.config=./config/kafka_client_jaas.conf" 
kafka-consumer-groups.sh --command-config ./config/consumer.properties 
--bootstrap-server kafka-server-1:9092 --group meta-printer --reset-offsets 
--execute --to-earliest --topic payload-meta
Note: This will not show information about old Zookeeper-based consumers.
 
 
TOPIC                          PARTITION  NEW-OFFSET     
payload-meta                   0          0              

{code}
And then a subsequent describe:
{code:java}
kafka@kafka-server-1:~$ KAFKA_OPTS="-Xmx128M 
-Djava.security.auth.login.config=./config/kafka_client_jaas.conf" 
kafka-consumer-groups.sh --command-config ./config/consumer.properties 
--bootstrap-server kafka-server-1:9092 --group meta-printer --describe
Note: This will not show information about old Zookeeper-based consumers.
 
Consumer group 'meta-printer' has no active members.
 
TOPIC                          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG   
     CONSUMER-ID                                       HOST                     
      CLIENT-ID
payload-meta                   0          974424          1060485         86061 
     -                                                 -                        
      -
 
{code}
 Unchanged. The default server retention is forever, so no log should be 
pruned. But even if there was pruning, a reset to earliest should be able to at 
least get the offset back to the first offset shown above. The problem is not 
present in 0.11. I you want output of that, I'll have to set it up.

Am I misunderstanding what should be happening here?

 

Many thanks.

> group offset reset and begin_offset ignored/no effect
> -
>
> Key: KAFKA-6547
> URL: https://issues.apache.org/jira/browse/KAFKA-6547
> Project: Kafka
>  Issue Type: Bug
>  Components: offset manager
>Affects Versions: 1.0.0
> Environment: ubuntu 16, java 1.8
>Reporter: Dan
>Priority: Major
> Fix For: 0.11.0.2
>
>
> Use of kafka-consumer-group.sh with --reset-offsets --execute  <--to-earliest 
> or anything> has no effect in 1.0. When my group client connects and requests 
> a specific offset or an earliest there's no effect and the consumer is unable 
> to poll, so no messages, even new ones are ignored.
> I installed 0.11 and these problems are not manifest.
> I'm unfamiliar with the internals and put the offset manager as the possible 
> component, but that's a guess.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6529) Broker leaks memory and file descriptors after sudden client disconnects

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358354#comment-16358354
 ] 

ASF GitHub Bot commented on KAFKA-6529:
---

rajinisivaram closed pull request #4517: KAFKA-6529: Stop file descriptor leak 
when client disconnects with staged receives
URL: https://github.com/apache/kafka/pull/4517
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/Selector.java 
b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 6bfcfd21a90..ed037b3a8f7 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -325,9 +325,9 @@ public void send(Send send) {
 } catch (Exception e) {
 // update the state for consistency, the channel will be 
discarded after `close`
 channel.state(ChannelState.FAILED_SEND);
-// ensure notification via `disconnected`
+// ensure notification via `disconnected` when `failedSends` 
are processed in the next poll
 this.failedSends.add(connectionId);
-close(channel, false);
+close(channel, false, false);
 if (!(e instanceof CancelledKeyException)) {
 log.error("Unexpected exception during send, closing 
connection {} and rethrowing exception {}",
 connectionId, e);
@@ -450,6 +450,7 @@ void pollSelectionKeys(Set selectionKeys,
 if (idleExpiryManager != null)
 idleExpiryManager.update(channel.id(), currentTimeNanos);
 
+boolean sendFailed = false;
 try {
 
 /* complete any connections that have finished their handshake 
(either normally or immediately) */
@@ -491,7 +492,13 @@ void pollSelectionKeys(Set selectionKeys,
 
 /* if channel is ready write to any sockets that have space in 
their buffer and for which we have data */
 if (channel.ready() && key.isWritable()) {
-Send send = channel.write();
+Send send = null;
+try {
+send = channel.write();
+} catch (Exception e) {
+sendFailed = true;
+throw e;
+}
 if (send != null) {
 this.completedSends.add(send);
 this.sensors.recordBytesSent(channel.id(), 
send.size());
@@ -500,7 +507,7 @@ void pollSelectionKeys(Set selectionKeys,
 
 /* cancel any defunct sockets */
 if (!key.isValid())
-close(channel, true);
+close(channel, true, true);
 
 } catch (Exception e) {
 String desc = channel.socketDescription();
@@ -510,7 +517,7 @@ else if (e instanceof AuthenticationException) // will be 
logged later as error
 log.debug("Connection with {} disconnected due to 
authentication exception", desc, e);
 else
 log.warn("Unexpected error from {}; closing connection", 
desc, e);
-close(channel, true);
+close(channel, !sendFailed, true);
 } finally {
 maybeRecordTimePerConnection(channel, channelStartTimeNanos);
 }
@@ -620,7 +627,7 @@ private void maybeCloseOldestConnection(long 
currentTimeNanos) {
 log.trace("About to close the idle connection from {} due 
to being idle for {} millis",
 connectionId, (currentTimeNanos - 
expiredConnection.getValue()) / 1000 / 1000);
 channel.state(ChannelState.EXPIRED);
-close(channel, true);
+close(channel, true, true);
 }
 }
 }
@@ -674,7 +681,7 @@ public void close(String id) {
 // There is no disconnect notification for local close, but 
updating
 // channel state here anyway to avoid confusion.
 channel.state(ChannelState.LOCAL_CLOSE);
-close(channel, false);
+close(channel, false, false);
 } else {
 KafkaChannel closingChannel = this.closingChannels.remove(id);
 // Close any closing channel, leave the channel in the state in 
which closing was triggered
@@ -694,7 +701,10 @@ public void close(String id) {
  * closed immediately. The channel will not be added to disconnected list 
and it is the
  * responsibility of the caller to handle disconnect 

[jira] [Updated] (KAFKA-6549) Deadlock while processing Controller Events

2018-02-09 Thread Rajini Sivaram (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-6549:
--
 Priority: Blocker  (was: Major)
Fix Version/s: 1.1.0
  Component/s: core

> Deadlock while processing Controller Events
> ---
>
> Key: KAFKA-6549
> URL: https://issues.apache.org/jira/browse/KAFKA-6549
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Manikumar
>Assignee: Manikumar
>Priority: Blocker
> Fix For: 1.1.0
>
> Attachments: td.txt
>
>
> Stack traces from a single node test cluster that was deadlocked while 
> processing controller Reelect and Expire events. Attached stack-trace.
> {quote}
> "main-EventThread" #18 daemon prio=5 os_prio=31 tid=0x7f83e4285800 
> nid=0x7d03 waiting on condition [0x7278b000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0007bccadf30> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessed(KafkaController.scala:1505)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:163)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply$mcV$sp(ZooKeeperClient.scala:365)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>  at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process(ZooKeeperClient.scala:363)
>  at 
> org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:531)
>  at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:506)
> Locked ownable synchronizers:
>  - <0x000780054860> (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
>  
> "controller-event-thread" #42 prio=5 os_prio=31 tid=0x7f83e4293800 
> nid=0xad03 waiting on condition [0x73fd3000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0007bcc584a0> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>  at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:148)
>  at 
> kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1439)
>  at 
> kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1432)
>  at 
> kafka.zk.KafkaZkClient.registerZNodeChangeHandlerAndCheckExistence(KafkaZkClient.scala:1171)
>  at 
> kafka.controller.KafkaController$Reelect$.process(KafkaController.scala:1475)
>  at 
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:69)
>  at 
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69)
>  at 
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69)
>  at 

[jira] [Created] (KAFKA-6551) Unbounded queues in WorkerSourceTask cause OutOfMemoryError

2018-02-09 Thread Gunnar Morling (JIRA)
Gunnar Morling created KAFKA-6551:
-

 Summary: Unbounded queues in WorkerSourceTask cause 
OutOfMemoryError
 Key: KAFKA-6551
 URL: https://issues.apache.org/jira/browse/KAFKA-6551
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Gunnar Morling


A Debezium user reported an {{OutOfMemoryError}} to us, with over 50,000 
messages in the {{WorkerSourceTask#outstandingMessages}} map.

This map is unbounded and I can't see any way of "rate limiting" which would 
control how many records are added to it. Growth can only indirectly be limited 
by reducing the offset flush interval, but as connectors can return large 
amounts of messages in single {{poll()}} calls that's not sufficient in all 
cases. Note the user reported this issue during snapshotting a database, i.e. a 
high number of records arrived in a very short period of time.

To solve the problem I'd suggest to make this map backpressure-aware and thus 
prevent its indefinite growth, so that no further records will be polled from 
the connector until messages have been taken out of the map again.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6550) UpdateMetadataRequest should be lazily created

2018-02-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6550?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358293#comment-16358293
 ] 

ASF GitHub Bot commented on KAFKA-6550:
---

huxihx opened a new pull request #4553: KAFKA-6550: UpdateMetadataRequest 
should be lazily created
URL: https://github.com/apache/kafka/pull/4553
 
 
   We should defer to construct UpdateMetadataRequest Builder to the right time 
when we really need them.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> UpdateMetadataRequest should be lazily created
> --
>
> Key: KAFKA-6550
> URL: https://issues.apache.org/jira/browse/KAFKA-6550
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller
>Affects Versions: 1.0.0
>Reporter: huxihx
>Assignee: huxihx
>Priority: Minor
>
> In ControllerBrokerRequestBatch.sendRequestsToBrokers, there is no need to 
> eagerly construct the UpdateMetadataRequest.Builder since sometimes 
> updateMetadataRequestBrokerSet is actually empty. In those cases, we should 
> defer the construction to the time when we really need them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6550) UpdateMetadataRequest should be lazily created

2018-02-09 Thread huxihx (JIRA)
huxihx created KAFKA-6550:
-

 Summary: UpdateMetadataRequest should be lazily created
 Key: KAFKA-6550
 URL: https://issues.apache.org/jira/browse/KAFKA-6550
 Project: Kafka
  Issue Type: Improvement
  Components: controller
Affects Versions: 1.0.0
Reporter: huxihx
Assignee: huxihx


In ControllerBrokerRequestBatch.sendRequestsToBrokers, there is no need to 
eagerly construct the UpdateMetadataRequest.Builder since sometimes 
updateMetadataRequestBrokerSet is actually empty. In those cases, we should 
defer the construction to the time when we really need them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6548) Migrate committed offsets from ZooKeeper to Kafka

2018-02-09 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/KAFKA-6548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sönke Liebau resolved KAFKA-6548.
-
Resolution: Not A Problem

Hi [~ppmanikandan...@gmail.com] 

it sounds like in principle you know what you want to do and are on the right 
track. I don't think that this should be filed in jira, but is rather something 
that would be well placed on the kafka users mailing list or stackoverflow. 
Actually I just found that you also posted this to 
[stackoverflow|https://stackoverflow.com/questions/48696705/migrate-zookeeper-offset-details-to-kafka]
 and received an answer there, so I'll close this issue as I think further 
discussion is better placed on SO.

> Migrate committed offsets from ZooKeeper to Kafka
> -
>
> Key: KAFKA-6548
> URL: https://issues.apache.org/jira/browse/KAFKA-6548
> Project: Kafka
>  Issue Type: Improvement
>  Components: offset manager
>Affects Versions: 0.10.0.0
> Environment: Windows
>Reporter: Manikandan P
>Priority: Minor
>
> We were using previous version of Kafka(0.8.X) where all the offset details 
> were stored in ZooKeeper. 
> Now we moved to new version of Kafka(0.10.X) where all the Topic offset 
> details are stored in Kafka itself. 
> We have to move all the Topic offset details to ZooKeeper to Kafka for 
> existing application in Production.
> Kafka is installed in Windows machine. we can't run kafka-consumer-groups.sh 
> from windows.
> Please advice how to migrate committed offsets from ZooKeeper to Kafka.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6549) Deadlock while processing Controller Events

2018-02-09 Thread Manikumar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-6549:
-
Summary: Deadlock while processing Controller Events  (was: Deadlock in 
ZookeeperClient while processing Controller Events)

> Deadlock while processing Controller Events
> ---
>
> Key: KAFKA-6549
> URL: https://issues.apache.org/jira/browse/KAFKA-6549
> Project: Kafka
>  Issue Type: Bug
>Reporter: Manikumar
>Assignee: Manikumar
>Priority: Major
> Attachments: td.txt
>
>
> Stack traces from a single node test cluster that was deadlocked while 
> processing controller Reelect and Expire events. Attached stack-trace.
> {quote}
> "main-EventThread" #18 daemon prio=5 os_prio=31 tid=0x7f83e4285800 
> nid=0x7d03 waiting on condition [0x7278b000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0007bccadf30> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>  at 
> kafka.controller.KafkaController$Expire.waitUntilProcessed(KafkaController.scala:1505)
>  at 
> kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:163)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>  at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply$mcV$sp(ZooKeeperClient.scala:365)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
>  at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258)
>  at 
> kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process(ZooKeeperClient.scala:363)
>  at 
> org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:531)
>  at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:506)
> Locked ownable synchronizers:
>  - <0x000780054860> (a 
> java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
>  
> "controller-event-thread" #42 prio=5 os_prio=31 tid=0x7f83e4293800 
> nid=0xad03 waiting on condition [0x73fd3000]
>  java.lang.Thread.State: WAITING (parking)
>  at sun.misc.Unsafe.park(Native Method)
>  - parking to wait for <0x0007bcc584a0> (a 
> java.util.concurrent.CountDownLatch$Sync)
>  at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
>  at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
>  at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
>  at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:148)
>  at 
> kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1439)
>  at 
> kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1432)
>  at 
> kafka.zk.KafkaZkClient.registerZNodeChangeHandlerAndCheckExistence(KafkaZkClient.scala:1171)
>  at 
> kafka.controller.KafkaController$Reelect$.process(KafkaController.scala:1475)
>  at 
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:69)
>  at 
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69)
>  at 
> kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69)
>  at 

[jira] [Created] (KAFKA-6549) Deadlock in ZookeeperClient while processing Controller Events

2018-02-09 Thread Manikumar (JIRA)
Manikumar created KAFKA-6549:


 Summary: Deadlock in ZookeeperClient while processing Controller 
Events
 Key: KAFKA-6549
 URL: https://issues.apache.org/jira/browse/KAFKA-6549
 Project: Kafka
  Issue Type: Bug
Reporter: Manikumar
Assignee: Manikumar
 Attachments: td.txt

Stack traces from a single node test cluster that was deadlocked while 
processing controller Reelect and Expire events. Attached stack-trace.

{quote}

"main-EventThread" #18 daemon prio=5 os_prio=31 tid=0x7f83e4285800 
nid=0x7d03 waiting on condition [0x7278b000]
 java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for <0x0007bccadf30> (a 
java.util.concurrent.CountDownLatch$Sync)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
 at 
kafka.controller.KafkaController$Expire.waitUntilProcessed(KafkaController.scala:1505)
 at 
kafka.controller.KafkaController$$anon$7.beforeInitializingSession(KafkaController.scala:163)
 at 
kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365)
 at 
kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2$$anonfun$apply$mcV$sp$6.apply(ZooKeeperClient.scala:365)
 at scala.collection.Iterator$class.foreach(Iterator.scala:891)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
 at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206)
 at 
kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply$mcV$sp(ZooKeeperClient.scala:365)
 at 
kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363)
 at 
kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$$anonfun$process$2.apply(ZooKeeperClient.scala:363)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
 at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:258)
 at 
kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$.process(ZooKeeperClient.scala:363)
 at 
org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:531)
 at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:506)

Locked ownable synchronizers:
 - <0x000780054860> (a 
java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
 
"controller-event-thread" #42 prio=5 os_prio=31 tid=0x7f83e4293800 
nid=0xad03 waiting on condition [0x73fd3000]
 java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for <0x0007bcc584a0> (a 
java.util.concurrent.CountDownLatch$Sync)
 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:231)
 at kafka.zookeeper.ZooKeeperClient.handleRequests(ZooKeeperClient.scala:148)
 at kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1439)
 at 
kafka.zk.KafkaZkClient.kafka$zk$KafkaZkClient$$retryRequestUntilConnected(KafkaZkClient.scala:1432)
 at 
kafka.zk.KafkaZkClient.registerZNodeChangeHandlerAndCheckExistence(KafkaZkClient.scala:1171)
 at 
kafka.controller.KafkaController$Reelect$.process(KafkaController.scala:1475)
 at 
kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply$mcV$sp(ControllerEventManager.scala:69)
 at 
kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69)
 at 
kafka.controller.ControllerEventManager$ControllerEventThread$$anonfun$doWork$1.apply(ControllerEventManager.scala:69)
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
 at 
kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:68)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)

"kafka-shutdown-hook" #14 prio=5 os_prio=31 tid=0x7f83e29b1000 nid=0x560f 
waiting on condition [0x75208000]
 java.lang.Thread.State: WAITING (parking)
 at sun.misc.Unsafe.park(Native Method)
 - parking to wait for <0x000780054860> (a 

[jira] [Commented] (KAFKA-3438) Rack Aware Replica Reassignment should warn of overloaded brokers

2018-02-09 Thread Andy Coates (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358216#comment-16358216
 ] 

Andy Coates commented on KAFKA-3438:


Morning.

Any reason why you’re email thing to me?


On 8 Feb 2018, at 14:22, Damian Guy (JIRA)  wrote:


  [ 
https://issues.apache.org/jira/browse/KAFKA-3438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Damian Guy updated KAFKA-3438:
--
 Fix Version/s: (was: 1.1.0)
2.0.0

Rack Aware Replica Reassignment should warn of overloaded brokers
-

 Key: KAFKA-3438
 URL: https://issues.apache.org/jira/browse/KAFKA-3438
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.10.0.0
Reporter: Ben Stopford
Assignee: Vahid Hashemian
Priority: Major
 Fix For: 2.0.0


We've changed the replica reassignment code to be rack aware.
One problem that might catch users out would be that they rebalance the cluster 
using kafka-reassign-partitions.sh but their rack configuration means that some 
high proportion of replicas are pushed onto a single, or small number of, 
brokers.
This should be an easy problem to avoid, by changing the rack assignment 
information, but we should probably warn users if they are going to create 
something that is unbalanced.
So imagine I have a Kafka cluster of 12 nodes spread over two racks with rack 
awareness enabled. If I add a 13th machine, on a new rack, and run the 
rebalance tool, that new machine will get ~6x as many replicas as the least 
loaded broker.
Suggest a warning  be added to the tool output when --generate is called. "The 
most loaded broker has 2.3x as many replicas as the the least loaded broker. 
This is likely due to an uneven distribution of brokers across racks. You're 
advised to alter the rack config so there are approximately the same number of 
brokers per rack" and displays the individual rack→#brokers and 
broker→#replicas data for the proposed move.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


> Rack Aware Replica Reassignment should warn of overloaded brokers
> -
>
> Key: KAFKA-3438
> URL: https://issues.apache.org/jira/browse/KAFKA-3438
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.10.0.0
>Reporter: Ben Stopford
>Assignee: Vahid Hashemian
>Priority: Major
> Fix For: 2.0.0
>
>
> We've changed the replica reassignment code to be rack aware.
> One problem that might catch users out would be that they rebalance the 
> cluster using kafka-reassign-partitions.sh but their rack configuration means 
> that some high proportion of replicas are pushed onto a single, or small 
> number of, brokers. 
> This should be an easy problem to avoid, by changing the rack assignment 
> information, but we should probably warn users if they are going to create 
> something that is unbalanced. 
> So imagine I have a Kafka cluster of 12 nodes spread over two racks with rack 
> awareness enabled. If I add a 13th machine, on a new rack, and run the 
> rebalance tool, that new machine will get ~6x as many replicas as the least 
> loaded broker. 
> Suggest a warning  be added to the tool output when --generate is called. 
> "The most loaded broker has 2.3x as many replicas as the the least loaded 
> broker. This is likely due to an uneven distribution of brokers across racks. 
> You're advised to alter the rack config so there are approximately the same 
> number of brokers per rack" and displays the individual rack→#brokers and 
> broker→#replicas data for the proposed move.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6446) KafkaProducer with transactionId endless waits when bootstrap server is down

2018-02-09 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358171#comment-16358171
 ] 

huxihx commented on KAFKA-6446:
---

[~edosciullo] I tried this way and the fix passed on my local env. Since 
initTranscations is the very first method when a transaction starts, it's 
probably okay for us to only refine this method. 

However, even with this fix, the producer still failed be closed after throwing 
TimeoutException due to the fact that Sender thread got stuck and did not 
respond. I am not sure if it's fully safe to simply break from the catch clause 
in `maybeSendTransactionalRequest`.

> KafkaProducer with transactionId endless waits when bootstrap server is down
> 
>
> Key: KAFKA-6446
> URL: https://issues.apache.org/jira/browse/KAFKA-6446
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.11.0.0, 1.0.0
>Reporter: Eduardo Sciullo
>Priority: Critical
> Attachments: Test.java
>
>
> When bootstrap server is down, a KafkaProducer with transactionId endless 
> waits on initTransactions. 
> The timeouts don't apply to that operation: don't honor the 
> {{TRANSACTION_TIMEOUT_CONFIG.}}
> Attached an example of my code to reproduce the scenario.
>  
> I opened this issue as suggested by [Gary 
> Russell|https://stackoverflow.com/users/1240763/gary-russell]
> [https://stackoverflow.com/questions/48226546/defaultkafkaproducerfactory-with-transactionidprefix-endless-waits-when-bootstra]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6446) KafkaProducer with transactionId endless waits when bootstrap server is down

2018-02-09 Thread Eduardo Sciullo (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358168#comment-16358168
 ] 

Eduardo Sciullo commented on KAFKA-6446:


The same behavior appears for commitTransaction, abortTransaction, etc.. 
methods of KafkaProducer (don't honor the {{TRANSACTION_TIMEOUT_CONFIG).}}

[~huxi_2b] I think that replace await() with the timed version is a good idea.

> KafkaProducer with transactionId endless waits when bootstrap server is down
> 
>
> Key: KAFKA-6446
> URL: https://issues.apache.org/jira/browse/KAFKA-6446
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.11.0.0, 1.0.0
>Reporter: Eduardo Sciullo
>Priority: Critical
> Attachments: Test.java
>
>
> When bootstrap server is down, a KafkaProducer with transactionId endless 
> waits on initTransactions. 
> The timeouts don't apply to that operation: don't honor the 
> {{TRANSACTION_TIMEOUT_CONFIG.}}
> Attached an example of my code to reproduce the scenario.
>  
> I opened this issue as suggested by [Gary 
> Russell|https://stackoverflow.com/users/1240763/gary-russell]
> [https://stackoverflow.com/questions/48226546/defaultkafkaproducerfactory-with-transactionidprefix-endless-waits-when-bootstra]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6446) KafkaProducer with transactionId endless waits when bootstrap server is down

2018-02-09 Thread huxihx (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16358163#comment-16358163
 ] 

huxihx commented on KAFKA-6446:
---

Sounds like it's easy to replace await() with the timed version. However,  
there is another serious problem in this case in which Sender thread got stuck 
in the infinite loop in `maybeSendTransactionalRequest` which impedes the 
thread to be closed. [~becket_qin] Does it make sense?

> KafkaProducer with transactionId endless waits when bootstrap server is down
> 
>
> Key: KAFKA-6446
> URL: https://issues.apache.org/jira/browse/KAFKA-6446
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.11.0.0, 1.0.0
>Reporter: Eduardo Sciullo
>Priority: Critical
> Attachments: Test.java
>
>
> When bootstrap server is down, a KafkaProducer with transactionId endless 
> waits on initTransactions. 
> The timeouts don't apply to that operation: don't honor the 
> {{TRANSACTION_TIMEOUT_CONFIG.}}
> Attached an example of my code to reproduce the scenario.
>  
> I opened this issue as suggested by [Gary 
> Russell|https://stackoverflow.com/users/1240763/gary-russell]
> [https://stackoverflow.com/questions/48226546/defaultkafkaproducerfactory-with-transactionidprefix-endless-waits-when-bootstra]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)