[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-12-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14257805#comment-14257805
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 12/24/14 7:07 PM:
-

[~ewencp],

Thanks for patch.  You may close this issue.   The only thing,  I have not 
tested the rare case where a single broker is out of File Descriptor and under 
heavy load on producer will request more connections to same broker.  According 
to code, it will mark the Node State to disconnect and I am not sure if data 
will be sent via already live connection.  

Another comment is that there is no WARN or ERROR message logged when 
connection fails. Can we please change the log level for following code to WAR, 
because in production environment people set LOG LEVEL to WARN or ERROR.  So 
there will no visibility if there is connection issue.  

{code}
   /**
 * Initiate a connection to the given node
 */
private void initiateConnect(Node node, long now) {
try {
log.debug(Initiating connection to node {} at {}:{}., node.id(), 
node.host(), node.port());
this.connectionStates.connecting(node.id(), now);
selector.connect(node.id(), new InetSocketAddress(node.host(), 
node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
} catch (IOException e) {
/* attempt failed, we'll try again after the backoff */
connectionStates.disconnected(node.id());
/* maybe the problem is our metadata, update it */
metadata.requestUpdate();
log.debug(Error connecting to node {} at {}:{}:, node.id(), 
node.host(), node.port(), e);
}
}
{code}

Thanks for all your help !

Thanks,

Bhavesh  


was (Author: bmis13):
[~ewencp],

Thanks for patch.  You may close this issue.   The only thing,  I have not 
tested the rare case where a single broker is out of File Descriptor and under 
heavy load on producer will request more connections to same broker.  According 
to code, it will mark the Node State to disconnect and I am not sure if data 
will be sent via already live connection.  

Thanks for all your help !

Thanks,

Bhavesh  

 [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
 connection is lost
 ---

 Key: KAFKA-1642
 URL: https://issues.apache.org/jira/browse/KAFKA-1642
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2
Reporter: Bhavesh Mistry
Assignee: Ewen Cheslack-Postava
Priority: Blocker
 Fix For: 0.8.2

 Attachments: 
 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
 KAFKA-1642.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
 KAFKA-1642_2014-10-23_16:19:41.patch


 I see my CPU spike to 100% when network connection is lost for while.  It 
 seems network  IO thread are very busy logging following error message.  Is 
 this expected behavior ?
 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
 producer I/O thread: 
 java.lang.IllegalStateException: No entry found for node -2
 at 
 org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
 at 
 org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
 at 
 org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
 at 
 org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 Thanks,
 Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-12-23 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14257686#comment-14257686
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 12/23/14 11:36 PM:
--

[~ewencp],

Patch indeed solve the high CPU Problem reported by this bug.  I have tested 
all brokers down, one broker down and two broker down (except for last use 
cases where one of the brokers runs out of Socket File Descriptor a rear case) :

Here are some interesting Observations from YourKit:

0)  Overall, patch has also brought down  overall consumption in Normal Healthy 
or Happy case where every thing is up and running.  In old code (without 
patch), I use to see about 10% of overall CPU used by process by io threads (4 
in my case), it has reduce to 5% or less now with path.   

1)  When two brokers are down, then occasionally I see IO thread blocked. ( 
I did not see this when one brokers is down) 

{code}
kafka-producer-network-thread | rawlog [BLOCKED] [DAEMON]
org.apache.kafka.clients.producer.internals.Metadata.fetch() Metadata.java:70
java.lang.Thread.run() Thread.java:744
{code}

2)  record-error-rate metric remain zero despite following firewall rule.  
In my opinion, it should have called  
org.apache.kafka.clients.producer.Callback  but I did not see that happening 
either in either one or two brokers down.  Should I file another issue for this 
? Please confirm.

{code}
00100 reject tcp from me to b1.ip dst-port 9092
00200 reject tcp from me to b2.ip dst-port 9092
{code}

{code}
class LoggingCallBaHandler implements Callback {

/**
 * A callback method the user can implement to provide 
asynchronous
 * handling of request completion. This method will be called 
when the
 * record sent to the server has been acknowledged. Exactly one 
of the
 * arguments will be non-null.
 * 
 * @param metadata
 *The metadata for the record that was sent (i.e. 
the
 *partition and offset). Null if an error occurred.
 * @param exception
 *The exception thrown during processing of this 
record.
 *Null if no error occurred.
 */
@Override
public void onCompletion(RecordMetadata metadata, Exception 
exception) {
if(exception != null){
exception.printStackTrace();
}
}
}
{code}

I do not see any exception at all on consolenot sure why ?

3)  Application does NOT gracefully shutdown when there one or more brokers 
are down. (io Thread never exits this is know issue ) 

{code}
SIGTERM handler daemon prio=5 tid=0x7f8bd79e4000 nid=0x17907 waiting for 
monitor entry [0x00011e906000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at java.lang.Shutdown.exit(Shutdown.java:212)
- waiting to lock 0x00070008f7c0 (a java.lang.Class for 
java.lang.Shutdown)
at java.lang.Terminator$1.handle(Terminator.java:52)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:744)

SIGTERM handler daemon prio=5 tid=0x7f8bd5159000 nid=0x1cb0b waiting for 
monitor entry [0x00011e803000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at java.lang.Shutdown.exit(Shutdown.java:212)
- waiting to lock 0x00070008f7c0 (a java.lang.Class for 
java.lang.Shutdown)
at java.lang.Terminator$1.handle(Terminator.java:52)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:744)

SIGTERM handler daemon prio=5 tid=0x7f8bdd147800 nid=0x15d0b waiting for 
monitor entry [0x00011e30a000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at java.lang.Shutdown.exit(Shutdown.java:212)
- waiting to lock 0x00070008f7c0 (a java.lang.Class for 
java.lang.Shutdown)
at java.lang.Terminator$1.handle(Terminator.java:52)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:744)

SIGTERM handler daemon prio=5 tid=0x7f8bdf82 nid=0x770b waiting for 
monitor entry [0x00011e207000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at java.lang.Shutdown.exit(Shutdown.java:212)
- waiting to lock 0x00070008f7c0 (a java.lang.Class for 
java.lang.Shutdown)
at java.lang.Terminator$1.handle(Terminator.java:52)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:744)

SIGTERM handler daemon prio=5 tid=0x7f8bdc393800 nid=0x1c30f waiting for 
monitor entry [0x00011e104000]
   java.lang.Thread.State: BLOCKED (on object monitor)

[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-12-23 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14257686#comment-14257686
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 12/23/14 11:39 PM:
--

[~ewencp],

Patch indeed solve the high CPU Problem reported by this bug.  I have tested 
all brokers down, one broker down and two broker down (except for last use 
cases where one of the brokers runs out of Socket File Descriptor a rear case) 
:  I am sorry for last response, I got busy with other stuff so testing got 
delayed.

Here are some interesting Observations from YourKit:

0)  Overall, patch has also brought down  overall consumption in Normal Healthy 
or Happy case where every thing is up and running.  In old code (without 
patch), I use to see about 10% of overall CPU used by process by io threads (4 
in my case), it has reduce to 5% or less now with path.   

1)  When two brokers are down, then occasionally I see IO thread blocked. ( 
I did not see this when one brokers is down) 

{code}
kafka-producer-network-thread | rawlog [BLOCKED] [DAEMON]
org.apache.kafka.clients.producer.internals.Metadata.fetch() Metadata.java:70
java.lang.Thread.run() Thread.java:744
{code}

2)  record-error-rate metric remain zero despite following firewall rule.  
In my opinion, it should have called  
org.apache.kafka.clients.producer.Callback  but I did not see that happening 
either in either one or two brokers down.  Should I file another issue for this 
? Please confirm.

{code}
00100 reject tcp from me to b1.ip dst-port 9092
00200 reject tcp from me to b2.ip dst-port 9092
{code}

{code}
class LoggingCallBaHandler implements Callback {

/**
 * A callback method the user can implement to provide 
asynchronous
 * handling of request completion. This method will be called 
when the
 * record sent to the server has been acknowledged. Exactly one 
of the
 * arguments will be non-null.
 * 
 * @param metadata
 *The metadata for the record that was sent (i.e. 
the
 *partition and offset). Null if an error occurred.
 * @param exception
 *The exception thrown during processing of this 
record.
 *Null if no error occurred.
 */
@Override
public void onCompletion(RecordMetadata metadata, Exception 
exception) {
if(exception != null){
exception.printStackTrace();
}
}
}
{code}

I do not see any exception at all on consolenot sure why ?

3)  Application does NOT gracefully shutdown when there one or more brokers 
are down. (io Thread never exits this is know issue ) 

{code}
SIGTERM handler daemon prio=5 tid=0x7f8bd79e4000 nid=0x17907 waiting for 
monitor entry [0x00011e906000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at java.lang.Shutdown.exit(Shutdown.java:212)
- waiting to lock 0x00070008f7c0 (a java.lang.Class for 
java.lang.Shutdown)
at java.lang.Terminator$1.handle(Terminator.java:52)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:744)

SIGTERM handler daemon prio=5 tid=0x7f8bd5159000 nid=0x1cb0b waiting for 
monitor entry [0x00011e803000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at java.lang.Shutdown.exit(Shutdown.java:212)
- waiting to lock 0x00070008f7c0 (a java.lang.Class for 
java.lang.Shutdown)
at java.lang.Terminator$1.handle(Terminator.java:52)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:744)

SIGTERM handler daemon prio=5 tid=0x7f8bdd147800 nid=0x15d0b waiting for 
monitor entry [0x00011e30a000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at java.lang.Shutdown.exit(Shutdown.java:212)
- waiting to lock 0x00070008f7c0 (a java.lang.Class for 
java.lang.Shutdown)
at java.lang.Terminator$1.handle(Terminator.java:52)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:744)

SIGTERM handler daemon prio=5 tid=0x7f8bdf82 nid=0x770b waiting for 
monitor entry [0x00011e207000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at java.lang.Shutdown.exit(Shutdown.java:212)
- waiting to lock 0x00070008f7c0 (a java.lang.Class for 
java.lang.Shutdown)
at java.lang.Terminator$1.handle(Terminator.java:52)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:744)

SIGTERM handler daemon prio=5 tid=0x7f8bdc393800 nid=0x1c30f waiting for 
monitor entry 

[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-12-23 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14257686#comment-14257686
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 12/23/14 11:41 PM:
--

[~ewencp],

Patch indeed solve the high CPU Problem reported by this bug.  I have tested 
all brokers down, one broker down and two broker down (except for last use 
cases where one of the brokers runs out of Socket File Descriptor a rear case) 
:  I am sorry for last response, I got busy with other stuff so testing got 
delayed.

Here are some interesting Observations from YourKit:

0)  Overall, patch has also brought down  overall consumption in Normal Healthy 
or Happy case where every thing is up and running.  In old code (without 
patch), I use to see about 10% of overall CPU used by process by io threads (4 
in my case), it has reduce to 5% or less now with path.   

1)  When two brokers are down, then occasionally I see IO thread blocked. ( 
I did not see this when one brokers is down) 

{code}
kafka-producer-network-thread | rawlog [BLOCKED] [DAEMON]
org.apache.kafka.clients.producer.internals.Metadata.fetch() Metadata.java:70
java.lang.Thread.run() Thread.java:744
{code}

2)  record-error-rate metric remain zero despite following firewall rule.  
In my opinion, it should have called  
org.apache.kafka.clients.producer.Callback  but I did not see that happening 
either in either one or two brokers down.  Should I file another issue for this 
? Please confirm.

{code}
00100 reject tcp from me to b1.ip dst-port 9092
00200 reject tcp from me to b2.ip dst-port 9092
{code}

{code}
class LoggingCallBaHandler implements Callback {

/**
 * A callback method the user can implement to provide 
asynchronous
 * handling of request completion. This method will be called 
when the
 * record sent to the server has been acknowledged. Exactly one 
of the
 * arguments will be non-null.
 * 
 * @param metadata
 *The metadata for the record that was sent (i.e. 
the
 *partition and offset). Null if an error occurred.
 * @param exception
 *The exception thrown during processing of this 
record.
 *Null if no error occurred.
 */
@Override
public void onCompletion(RecordMetadata metadata, Exception 
exception) {
if(exception != null){
exception.printStackTrace();
}
}
}
{code}

I do not see any exception at all on consolenot sure why ?

3)  Application does NOT gracefully shutdown when there one or more brokers 
are down. (io Thread never exits this is know issue ) 

{code}
SIGTERM handler daemon prio=5 tid=0x7f8bd79e4000 nid=0x17907 waiting for 
monitor entry [0x00011e906000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at java.lang.Shutdown.exit(Shutdown.java:212)
- waiting to lock 0x00070008f7c0 (a java.lang.Class for 
java.lang.Shutdown)
at java.lang.Terminator$1.handle(Terminator.java:52)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:744)

SIGTERM handler daemon prio=5 tid=0x7f8bd5159000 nid=0x1cb0b waiting for 
monitor entry [0x00011e803000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at java.lang.Shutdown.exit(Shutdown.java:212)
- waiting to lock 0x00070008f7c0 (a java.lang.Class for 
java.lang.Shutdown)
at java.lang.Terminator$1.handle(Terminator.java:52)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:744)

SIGTERM handler daemon prio=5 tid=0x7f8bdd147800 nid=0x15d0b waiting for 
monitor entry [0x00011e30a000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at java.lang.Shutdown.exit(Shutdown.java:212)
- waiting to lock 0x00070008f7c0 (a java.lang.Class for 
java.lang.Shutdown)
at java.lang.Terminator$1.handle(Terminator.java:52)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:744)

SIGTERM handler daemon prio=5 tid=0x7f8bdf82 nid=0x770b waiting for 
monitor entry [0x00011e207000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at java.lang.Shutdown.exit(Shutdown.java:212)
- waiting to lock 0x00070008f7c0 (a java.lang.Class for 
java.lang.Shutdown)
at java.lang.Terminator$1.handle(Terminator.java:52)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:744)

SIGTERM handler daemon prio=5 tid=0x7f8bdc393800 nid=0x1c30f waiting for 
monitor entry 

[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-12-23 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14257686#comment-14257686
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 12/24/14 12:01 AM:
--

[~ewencp],

Patch indeed solve the high CPU Problem reported by this bug.  I have tested 
all brokers down, one broker down and two broker down (except for last use 
cases where one of the brokers runs out of Socket File Descriptor a rear case) 
:  I am sorry for last response, I got busy with other stuff so testing got 
delayed.

Here are some interesting Observations from YourKit:

0)  Overall, patch has also brought down  overall consumption in Normal Healthy 
or Happy case where every thing is up and running.  In old code (without 
patch), I use to see about 10% of overall CPU used by process by io threads (4 
in my case), it has reduce to 5% or less now with path.   

1)  When two brokers are down, then occasionally I see IO thread blocked. ( 
I did not see this when one brokers is down) 

{code}
kafka-producer-network-thread | rawlog [BLOCKED] [DAEMON]
org.apache.kafka.clients.producer.internals.Metadata.fetch() Metadata.java:70
java.lang.Thread.run() Thread.java:744
{code}

2)  record-error-rate metric remain zero despite following firewall rule.  
In my opinion, it should have called  
org.apache.kafka.clients.producer.Callback  but I did not see that happening 
either in either one or two brokers down.  Should I file another issue for this 
? Please confirm.

{code}

sudo ipfw add reject tcp from me to b1.ip dst-port 9092
sudo ipfw add reject tcp from me to b2.ip dst-port 9092

00100 reject tcp from me to b1.ip dst-port 9092
00200 reject tcp from me to b2.ip dst-port 9092
{code}

{code}
class LoggingCallBaHandler implements Callback {

/**
 * A callback method the user can implement to provide 
asynchronous
 * handling of request completion. This method will be called 
when the
 * record sent to the server has been acknowledged. Exactly one 
of the
 * arguments will be non-null.
 * 
 * @param metadata
 *The metadata for the record that was sent (i.e. 
the
 *partition and offset). Null if an error occurred.
 * @param exception
 *The exception thrown during processing of this 
record.
 *Null if no error occurred.
 */
@Override
public void onCompletion(RecordMetadata metadata, Exception 
exception) {
if(exception != null){
exception.printStackTrace();
}
}
}
{code}

I do not see any exception at all on consolenot sure why ?

3)  Application does NOT gracefully shutdown when there one or more brokers 
are down. (io Thread never exits this is know issue ) 

{code}
SIGTERM handler daemon prio=5 tid=0x7f8bd79e4000 nid=0x17907 waiting for 
monitor entry [0x00011e906000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at java.lang.Shutdown.exit(Shutdown.java:212)
- waiting to lock 0x00070008f7c0 (a java.lang.Class for 
java.lang.Shutdown)
at java.lang.Terminator$1.handle(Terminator.java:52)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:744)

SIGTERM handler daemon prio=5 tid=0x7f8bd5159000 nid=0x1cb0b waiting for 
monitor entry [0x00011e803000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at java.lang.Shutdown.exit(Shutdown.java:212)
- waiting to lock 0x00070008f7c0 (a java.lang.Class for 
java.lang.Shutdown)
at java.lang.Terminator$1.handle(Terminator.java:52)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:744)

SIGTERM handler daemon prio=5 tid=0x7f8bdd147800 nid=0x15d0b waiting for 
monitor entry [0x00011e30a000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at java.lang.Shutdown.exit(Shutdown.java:212)
- waiting to lock 0x00070008f7c0 (a java.lang.Class for 
java.lang.Shutdown)
at java.lang.Terminator$1.handle(Terminator.java:52)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:744)

SIGTERM handler daemon prio=5 tid=0x7f8bdf82 nid=0x770b waiting for 
monitor entry [0x00011e207000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at java.lang.Shutdown.exit(Shutdown.java:212)
- waiting to lock 0x00070008f7c0 (a java.lang.Class for 
java.lang.Shutdown)
at java.lang.Terminator$1.handle(Terminator.java:52)
at sun.misc.Signal$1.run(Signal.java:212)
at 

[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-12-23 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14257686#comment-14257686
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 12/24/14 12:02 AM:
--

[~ewencp],

Patch indeed solve the high CPU Problem reported by this bug.  I have tested 
all brokers down, one broker down and two broker down (except for last use 
cases where one of the brokers runs out of Socket File Descriptor a rear case) 
:  I am sorry for last response, I got busy with other stuff so testing got 
delayed.

Here are some interesting Observations from YourKit:

0)  Overall, patch has also brought down  overall consumption in Normal Healthy 
or Happy case where every thing is up and running.  In old code (without 
patch), I used to see about 10% of overall CPU used by  io threads (4 in my 
case), it has reduce to 5% or less now with patch.   

1)  When two brokers are down, then occasionally I see IO thread blocked. ( 
I did not see this when one brokers is down) 

{code}
kafka-producer-network-thread | rawlog [BLOCKED] [DAEMON]
org.apache.kafka.clients.producer.internals.Metadata.fetch() Metadata.java:70
java.lang.Thread.run() Thread.java:744
{code}

2)  record-error-rate metric remain zero despite following firewall rule.  
In my opinion, it should have called  
org.apache.kafka.clients.producer.Callback  but I did not see that happening 
either in either one or two brokers down.  Should I file another issue for this 
? Please confirm.

{code}

sudo ipfw add reject tcp from me to b1.ip dst-port 9092
sudo ipfw add reject tcp from me to b2.ip dst-port 9092

00100 reject tcp from me to b1.ip dst-port 9092
00200 reject tcp from me to b2.ip dst-port 9092
{code}

{code}
class LoggingCallBaHandler implements Callback {

/**
 * A callback method the user can implement to provide 
asynchronous
 * handling of request completion. This method will be called 
when the
 * record sent to the server has been acknowledged. Exactly one 
of the
 * arguments will be non-null.
 * 
 * @param metadata
 *The metadata for the record that was sent (i.e. 
the
 *partition and offset). Null if an error occurred.
 * @param exception
 *The exception thrown during processing of this 
record.
 *Null if no error occurred.
 */
@Override
public void onCompletion(RecordMetadata metadata, Exception 
exception) {
if(exception != null){
exception.printStackTrace();
}
}
}
{code}

I do not see any exception at all on consolenot sure why ?

3)  Application does NOT gracefully shutdown when there one or more brokers 
are down. (io Thread never exits this is know issue ) 

{code}
SIGTERM handler daemon prio=5 tid=0x7f8bd79e4000 nid=0x17907 waiting for 
monitor entry [0x00011e906000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at java.lang.Shutdown.exit(Shutdown.java:212)
- waiting to lock 0x00070008f7c0 (a java.lang.Class for 
java.lang.Shutdown)
at java.lang.Terminator$1.handle(Terminator.java:52)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:744)

SIGTERM handler daemon prio=5 tid=0x7f8bd5159000 nid=0x1cb0b waiting for 
monitor entry [0x00011e803000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at java.lang.Shutdown.exit(Shutdown.java:212)
- waiting to lock 0x00070008f7c0 (a java.lang.Class for 
java.lang.Shutdown)
at java.lang.Terminator$1.handle(Terminator.java:52)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:744)

SIGTERM handler daemon prio=5 tid=0x7f8bdd147800 nid=0x15d0b waiting for 
monitor entry [0x00011e30a000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at java.lang.Shutdown.exit(Shutdown.java:212)
- waiting to lock 0x00070008f7c0 (a java.lang.Class for 
java.lang.Shutdown)
at java.lang.Terminator$1.handle(Terminator.java:52)
at sun.misc.Signal$1.run(Signal.java:212)
at java.lang.Thread.run(Thread.java:744)

SIGTERM handler daemon prio=5 tid=0x7f8bdf82 nid=0x770b waiting for 
monitor entry [0x00011e207000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at java.lang.Shutdown.exit(Shutdown.java:212)
- waiting to lock 0x00070008f7c0 (a java.lang.Class for 
java.lang.Shutdown)
at java.lang.Terminator$1.handle(Terminator.java:52)
at sun.misc.Signal$1.run(Signal.java:212)
at 

[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-12-08 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14239063#comment-14239063
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 12/9/14 6:53 AM:


[~stevenz3wu],

0.8.2 is very well tested and worked well under heavy load.  This bug is rare 
only happen when broker or network has issue.  We have been producing about 7 
to 10 TB per day using this new producer, so 0.8.2 is very safe to use in 
production.  It has survived  pick traffic of the year on large e-commerce 
site.  So I am fairly confident that  New Java API is indeed does true 
round-robin and much faster than Scala Based API.

[~ewencp],  I will verify the patch by end of this Friday, but do let me know 
your understanding based on my last comment. The goal is to rest this issue and 
cover all the use case.

Thanks,

Bhavesh


was (Author: bmis13):
[~stevenz3wu],

0.8.2 is very well tested and worked well under heavy load.  This bug is rare 
only happen when broker or network has issue.  We have been producing about 7 
to 10 TB per day using this new producer, so 0.8.2 is very safe to use in 
production.  It has survived  pick traffic of the year on large e-commerce 
site.  So I am fairly confident that  New Java API is indeed does true 
round-robin and much faster than Scala Based API.

[~ewencp],  I will verify the patch by end of this Friday, but do let me know 
your understanding based on my last comment.

Thanks,

Bhavesh

 [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
 connection is lost
 ---

 Key: KAFKA-1642
 URL: https://issues.apache.org/jira/browse/KAFKA-1642
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2
Reporter: Bhavesh Mistry
Assignee: Ewen Cheslack-Postava
Priority: Blocker
 Fix For: 0.8.2

 Attachments: 
 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
 KAFKA-1642.patch, KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
 KAFKA-1642_2014-10-23_16:19:41.patch


 I see my CPU spike to 100% when network connection is lost for while.  It 
 seems network  IO thread are very busy logging following error message.  Is 
 this expected behavior ?
 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
 producer I/O thread: 
 java.lang.IllegalStateException: No entry found for node -2
 at 
 org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
 at 
 org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
 at 
 org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
 at 
 org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 Thanks,
 Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-12-02 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14232061#comment-14232061
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 12/2/14 8:04 PM:


Hi  [~ewencp],

I will not have time to validate this patch till next week.  

Here is my comments:

1) Producer.close() method issue is not address with patch. In event of network 
connection lost or other events happens, IO thread will not be killed and close 
method hangs. In patch that I have provided, I had timeout for join method and 
interrupted IO thread.  I think we need similar solution.

2) Also, can we please add JMX monitoring for IO tread to know how quick it is 
running.  It will great to add this and run() method will report duration to 
metric in nano sec.
{code}
try{
ThreadMXBean bean = ManagementFactory.getThreadMXBean( );
if(bean.isThreadCpuTimeSupported()  
bean.isThreadCpuTimeEnabled()){
this.ioTheadCPUTime = metrics.sensor(iothread-cpu);
this.ioTheadCPUTime.add(iothread-cpu-ms, The Rate Of CPU 
Cycle used by iothead in NANOSECONDS, new Rate(TimeUnit.NANOSECONDS) {
public double measure(MetricConfig config, long now) {
return (now - metadata.lastUpdate()) / 1000.0;
}
}); 
}
}catch(Throwable th){
log.warn(Not able to set the CPU time... etc);
}
{code}

3)  Please check the timeout final value in *pollTimeout* if it is zero for 
constantly then we need to slow IO thread down.

4)  Defensive check is need for back off  in run() method when IO thread is 
aggressive.  

{code}

while (running) {
long start = time.milliseconds();
try {
run(time.milliseconds());
} catch (Exception e) {
log.error(Uncaught error in kafka producer I/O thread: , e);
}finally{
long durationInMs = time.milliseconds() - start;
// TODO Fix ME HERE GET DO exponential back-off sleep etc to 
prevent still CPU CYCLE HERE ?? How Much ...for the edge case...
if(durationInMs  200){
if(client.isAllRegistredNodesAreDown()){
countinuousRetry++;
 /// TODO MAKE THIS CONSTANT CONFIGURATION. 
when do we rest this interval ? so we can try aggressive again...
sleepInMs = ((long) Math.pow(2, 
countinuousRetry) * 500);
}else{
sleepInMs =  500 ; 
countinuousRetry = 0;
}

// Wait until the desired next time arrives using 
nanosecond
// accuracy timer (wait(time) isn't accurate enough on 
most platforms) 
try {
// TODO SLEEP IS NOT GOOD SOLUTON..
Thread.sleep(sleepInMs);
} catch (InterruptedException e) {
log.error(While sleeping some 
one interupted this tread probally close method on prodcuer close () );
}  
}
}
}
{code}

5)  When all nodes are disconnected, do you still want to spin the IO Thread ?

6)  When you have a firewall rule that says you can only have 2 concurrent TCP 
connections from Client to Brokers and client still have live TCP connection 
to same node (Broker), but new TCP connections are rejected. Node State will be 
marked as Disconnected in initiateConnect ?  Is this case handled gracefully  ?

By the way, thank you very much for quick reply and with new patch.  I 
appreciate your help.

Thanks,

Bhavesh 


was (Author: bmis13):
Hi  [~ewencp],

I will not have time to validate this patch till next week.  

Here is my comments:

1) You still have not address the Producer.close() method issue that in event 
of network connection lost or other events happens, IO thread will not be 
killed and close method hangs. In patch that I have provided, I had timeout for 
join method and interrupted IO thread.  I think we need similar for this.
2) Also, can we please add JMX monitoring for IO tread to know how quick it is 
running.  It will great to add this and run() method will report duration to 
metric.
{code}
try{
ThreadMXBean bean = ManagementFactory.getThreadMXBean( );
if(bean.isThreadCpuTimeSupported()  
bean.isThreadCpuTimeEnabled()){
this.ioTheadCPUTime = 

[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-26 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14226751#comment-14226751
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 11/26/14 8:08 PM:
-

[~ewencp],

Even setting long following parameter, states of system does get impacted does 
not matter what reconnect.backoff.ms and retry.backoff.ms is set to.  Once Node 
state is removed, the time out is set to 0.  Please see the following logs.  

#15 minutes
reconnect.backoff.ms=90
retry.backoff.ms=90

{code}
2014-11-26 11:01:27.898 Kafka Drop message topic=.rawlog
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
after 6 ms.
 2014-11-26 11:02:27.903 Kafka Drop message topic=.rawlog
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
after 6 ms.
 2014-11-26 11:03:27.903 Kafka Drop message topic=.rawlog
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
after 6 ms.
 2014-11-26 11:04:27.903 Kafka Drop message topic=.rawlog
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
after 6 ms.
 2014-11-26 11:05:27.904 Kafka Drop message topic=.rawlog
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
after 6 ms.
 2014-11-26 11:06:27.905 Kafka Drop message topic=.rawlog
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
after 6 ms.
 2014-11-26 11:07:27.906 Kafka Drop message topic=.rawlog
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
after 6 ms.
 2014-11-26 11:08:27.908 Kafka Drop message topic=.rawlog
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
after 6 ms.
 2014-11-26 11:09:27.908 Kafka Drop message topic=.rawlog
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
after 6 ms.
 2014-11-26 11:10:27.909 Kafka Drop message topic=.rawlog
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
after 6 ms.
 2014-11-26 11:11:27.909 Kafka Drop message topic=.rawlog
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
after 6 ms.
 2014-11-26 11:12:27.910 Kafka Drop message topic=.rawlog
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
after 6 ms.
 2014-11-26 11:13:27.911 Kafka Drop message topic=.rawlog
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
after 6 ms.
 2014-11-26 11:14:27.912 Kafka Drop message topic=.rawlog
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
after 6 ms.
 2014-11-26 11:15:27.914 Kafka Drop message topic=.rawlog
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
after 6 ms.
 2014-11-26 11:00:27.613 [kafka-producer-network-thread | heartbeat] ERROR 
org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
producer I/O thread: 
 2014-11-26 11:00:27.613 [kafka-producer-network-thread | rawlog] ERROR 
org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
producer I/O thread: 
java.lang.IllegalStateException: No entry found for node -1
at 
org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:131)
at 
org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:120)
at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:407)
at 
org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:393)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:187)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
at java.lang.Thread.run(Thread.java:744)
java.lang.IllegalStateException: No entry found for node -3
at 
org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:131)
at 
org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:120)
at 
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:407)
at 
org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:393)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:187)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184)
at 
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
at java.lang.Thread.run(Thread.java:744)
 2014-11-26 11:00:27.613 [kafka-producer-network-thread | heartbeat] ERROR 
org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
producer I/O thread: 
 2014-11-26 11:00:27.613 [kafka-producer-network-thread | error] ERROR 

[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14223161#comment-14223161
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 11/24/14 5:26 PM:
-

[~ewencp],

The way to reproduce this is to simulate network instability by turning on and 
off network service (or turn on/off physical cable).   The connect and see if 
recover and disconnect and connect again etc.. you will see the behavior again 
and again. 

The issue is also with connection state management :

{code}
private void initiateConnect(Node node, long now) {
try {
log.debug(Initiating connection to node {} at {}:{}., node.id(), 
node.host(), node.port());
// TODO FIX java.lang.IllegalStateException: No entry found for 
node -3 (We need put before remove it..)..
this.connectionStates.connecting(node.id(), now);  (This line has 
problem because it will loose previous last attempt made get above exception 
and it will try to connect to that node for ever and ever with exception )
selector.connect(node.id(), new InetSocketAddress(node.host(), 
node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
} catch (IOException e) {
/* attempt failed, we'll try again after the backoff */
connectionStates.disconnectedWhenConnectting(node.id());
/* maybe the problem is our metadata, update it */
metadata.requestUpdate();
log.debug(Error connecting to node {} at {}:{}:, node.id(), 
node.host(), node.port(), e);
}
}
{code}

In my opinion, regardless of what node status is in run() method needs to be 
safe-guarded from still CPU Cycle when there is no state for Node.  (Hence I 
have added exponential sleep as temp solution to not to still CPU cycle , I 
think must protect it some how and check the execution time...)

Please let me know if you need more info  and i will be more than happy to 
reproduce bug and we can have conference call, and I can show you the problem.

Thanks,
Bhavesh 




was (Author: bmis13):
[~ewencp],

The way to reproduce this is to simulate network instability by turning on and 
off network service (or turn on/off physical cable).   The connect and see if 
recover and disconnect and connect again etc.. you will see the behavior again 
and again. 

The issue is also with connection state management :

{code}
private void initiateConnect(Node node, long now) {
try {
log.debug(Initiating connection to node {} at {}:{}., node.id(), 
node.host(), node.port());
// TODO FIX java.lang.IllegalStateException: No entry found for 
node -3 (We need put before remove it..)..
this.connectionStates.connecting(node.id(), now);  (This line has 
problem because it will loose previous last attempt made get above exception 
and it will try to connect to that node for ever and ever with exception )
selector.connect(node.id(), new InetSocketAddress(node.host(), 
node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
} catch (IOException e) {
/* attempt failed, we'll try again after the backoff */
connectionStates.disconnectedWhenConnectting(node.id());
/* maybe the problem is our metadata, update it */
metadata.requestUpdate();
log.debug(Error connecting to node {} at {}:{}:, node.id(), 
node.host(), node.port(), e);
}
}
{code]

In my opinion, regardless of what node status is in run() method needs to be 
safe-guarded from still CPU Cycle when there is no state for Node.  (Hence I 
have added exponential sleep as temp solution to not to still CPU cycle , I 
think must protect it some how and check the execution time...)

Please let me know if you need more info  and i will be more than happy to 
reproduce bug and we can have conference call, and I can show you the problem.

Thanks,
Bhavesh 



 [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
 connection is lost
 ---

 Key: KAFKA-1642
 URL: https://issues.apache.org/jira/browse/KAFKA-1642
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2
Reporter: Bhavesh Mistry
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.2

 Attachments: 
 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
 KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
 KAFKA-1642_2014-10-23_16:19:41.patch


 I see my CPU spike to 100% when network connection is lost for while.  It 
 seems network  IO thread are very busy logging following error message.  Is 
 this expected behavior ?
 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
 

[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14223161#comment-14223161
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 11/24/14 5:27 PM:
-

[~ewencp],

The way to reproduce this is to simulate network instability by turning on and 
off network service (or turn on/off physical cable).   The connect and see if 
recover and disconnect and connect again etc.. you will see the behavior again 
and again. 

The issue is also with connection state management :

{code}
private void initiateConnect(Node node, long now) {
try {
log.debug(Initiating connection to node {} at {}:{}., node.id(), 
node.host(), node.port());
// TODO FIX java.lang.IllegalStateException: No entry found for 
node -3 (We need put before remove it..)..
this.connectionStates.connecting(node.id(), now);  (This line has 
problem because it will loose previous last attempt made get above exception 
and it will try to connect to that node for ever and ever with exception )
selector.connect(node.id(), new InetSocketAddress(node.host(), 
node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
} catch (IOException e) {
/* attempt failed, we'll try again after the backoff */
connectionStates.disconnectedWhenConnectting(node.id());
/* maybe the problem is our metadata, update it */
metadata.requestUpdate();
log.debug(Error connecting to node {} at {}:{}:, node.id(), 
node.host(), node.port(), e);
}
}
{code}

In my opinion, regardless of what node status is in run() method needs to be 
safe-guarded from still CPU Cycle when there is no state for Node.  (Hence I 
have added exponential sleep as temp solution to not to still CPU cycle , I 
think must protect it some how and check the execution time...)

Please let me know if you need more info  and i will be more than happy to 
reproduce bug and we can have conference call, and I can show you the problem.

Based on code diff I have done from 0.8.1.1 tag and this.  This issue also 
occur in  0.8.1.1 as well I think.

Thanks,
Bhavesh 




was (Author: bmis13):
[~ewencp],

The way to reproduce this is to simulate network instability by turning on and 
off network service (or turn on/off physical cable).   The connect and see if 
recover and disconnect and connect again etc.. you will see the behavior again 
and again. 

The issue is also with connection state management :

{code}
private void initiateConnect(Node node, long now) {
try {
log.debug(Initiating connection to node {} at {}:{}., node.id(), 
node.host(), node.port());
// TODO FIX java.lang.IllegalStateException: No entry found for 
node -3 (We need put before remove it..)..
this.connectionStates.connecting(node.id(), now);  (This line has 
problem because it will loose previous last attempt made get above exception 
and it will try to connect to that node for ever and ever with exception )
selector.connect(node.id(), new InetSocketAddress(node.host(), 
node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
} catch (IOException e) {
/* attempt failed, we'll try again after the backoff */
connectionStates.disconnectedWhenConnectting(node.id());
/* maybe the problem is our metadata, update it */
metadata.requestUpdate();
log.debug(Error connecting to node {} at {}:{}:, node.id(), 
node.host(), node.port(), e);
}
}
{code}

In my opinion, regardless of what node status is in run() method needs to be 
safe-guarded from still CPU Cycle when there is no state for Node.  (Hence I 
have added exponential sleep as temp solution to not to still CPU cycle , I 
think must protect it some how and check the execution time...)

Please let me know if you need more info  and i will be more than happy to 
reproduce bug and we can have conference call, and I can show you the problem.

Thanks,
Bhavesh 



 [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
 connection is lost
 ---

 Key: KAFKA-1642
 URL: https://issues.apache.org/jira/browse/KAFKA-1642
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2
Reporter: Bhavesh Mistry
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.2

 Attachments: 
 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
 KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
 KAFKA-1642_2014-10-23_16:19:41.patch


 I see my CPU spike to 100% when network connection is lost for while.  It 
 seems network  IO thread are very busy logging following 

[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14223161#comment-14223161
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 11/24/14 6:57 PM:
-

[~ewencp],

The way to reproduce this is to simulate network instability by turning on and 
off network service (or turn on/off physical cable).   The connect and see if 
recover and disconnect and connect again etc.. you will see the behavior again 
and again. 

The issue is also with connection state management :

{code}
private void initiateConnect(Node node, long now) {
try {
log.debug(Initiating connection to node {} at {}:{}., node.id(), 
node.host(), node.port());
// TODO FIX java.lang.IllegalStateException: No entry found for 
node -3 (We need put before remove it..)..
this.connectionStates.connecting(node.id(), now);  (This line has 
problem because it will loose previous last attempt made get above exception 
and it will try to connect to that node for ever and ever with exception )
selector.connect(node.id(), new InetSocketAddress(node.host(), 
node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
} catch (IOException e) {
/* attempt failed, we'll try again after the backoff */
connectionStates.disconnectedWhenConnectting(node.id());
/* maybe the problem is our metadata, update it */
metadata.requestUpdate();
log.debug(Error connecting to node {} at {}:{}:, node.id(), 
node.host(), node.port(), e);
}
}
{code}

In my opinion, regardless of what node status is in run() method needs to be 
safe-guarded from still CPU Cycle when there is no state for Node.  (Hence I 
have added exponential sleep as temp solution to not to stealing CPU cycle , I 
think must protect it some how and must check the execution time...)

Please let me know if you need more info  and i will be more than happy to 
reproduce bug and we can have conference call, and I can show you the problem.

Based on code diff I have done from 0.8.1.1 tag and this.  This issue also 
occur in  0.8.1.1 as well I think.

Thanks,
Bhavesh 




was (Author: bmis13):
[~ewencp],

The way to reproduce this is to simulate network instability by turning on and 
off network service (or turn on/off physical cable).   The connect and see if 
recover and disconnect and connect again etc.. you will see the behavior again 
and again. 

The issue is also with connection state management :

{code}
private void initiateConnect(Node node, long now) {
try {
log.debug(Initiating connection to node {} at {}:{}., node.id(), 
node.host(), node.port());
// TODO FIX java.lang.IllegalStateException: No entry found for 
node -3 (We need put before remove it..)..
this.connectionStates.connecting(node.id(), now);  (This line has 
problem because it will loose previous last attempt made get above exception 
and it will try to connect to that node for ever and ever with exception )
selector.connect(node.id(), new InetSocketAddress(node.host(), 
node.port()), this.socketSendBuffer, this.socketReceiveBuffer);
} catch (IOException e) {
/* attempt failed, we'll try again after the backoff */
connectionStates.disconnectedWhenConnectting(node.id());
/* maybe the problem is our metadata, update it */
metadata.requestUpdate();
log.debug(Error connecting to node {} at {}:{}:, node.id(), 
node.host(), node.port(), e);
}
}
{code}

In my opinion, regardless of what node status is in run() method needs to be 
safe-guarded from still CPU Cycle when there is no state for Node.  (Hence I 
have added exponential sleep as temp solution to not to still CPU cycle , I 
think must protect it some how and check the execution time...)

Please let me know if you need more info  and i will be more than happy to 
reproduce bug and we can have conference call, and I can show you the problem.

Based on code diff I have done from 0.8.1.1 tag and this.  This issue also 
occur in  0.8.1.1 as well I think.

Thanks,
Bhavesh 



 [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
 connection is lost
 ---

 Key: KAFKA-1642
 URL: https://issues.apache.org/jira/browse/KAFKA-1642
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2
Reporter: Bhavesh Mistry
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.2

 Attachments: 
 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
 KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
 KAFKA-1642_2014-10-23_16:19:41.patch


 I see my CPU 

[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14223532#comment-14223532
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 11/24/14 9:21 PM:
-

[~ewencp],

Also Regarding KafkaProder.close() method hangs for ever because of following 
loop, and 

{code}
Sender.java

 // okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment,
// wait until these are completed.
while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount()  0) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error(Uncaught error in kafka producer I/O thread: , e);
}
}

KafkaProducer.java

 /**
* Close this producer. This method blocks until all in-flight requests complete.
*/
@Override
public void close() {
log.trace(Closing the Kafka producer.);
this.sender.initiateClose();
try {
this.ioThread.join();  // THIS IS BLOCKED since ioThread does not give up.
} catch (InterruptedException e) {
throw new KafkaException(e);
}
this.metrics.close();
log.debug(The Kafka producer has closed.);
}
{code}

The issue describe in KAFKA-1788  is likelihood, but if you look the close call 
stack then calling thread that initiated the close() will hang till io thread 
dies (which it never dies when data is there and network is down).  

Thanks,

Bhavesh



was (Author: bmis13):
Also Regarding KafkaProder.close() method hangs for ever because of following 
loop, and 

{code}
Sender.java

 // okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment,
// wait until these are completed.
while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount()  0) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error(Uncaught error in kafka producer I/O thread: , e);
}
}

KafkaProducer.java

 /**
* Close this producer. This method blocks until all in-flight requests complete.
*/
@Override
public void close() {
log.trace(Closing the Kafka producer.);
this.sender.initiateClose();
try {
this.ioThread.join();  // THIS IS BLOCKED since ioThread does not give up.
} catch (InterruptedException e) {
throw new KafkaException(e);
}
this.metrics.close();
log.debug(The Kafka producer has closed.);
}
{code}

The issue describe in KAFKA-1788  is likelihood, but if you look the close call 
stack then calling thread that initiated the close() will hang till io thread 
dies (which it never dies when data is there and network is down).  

Thanks,

Bhavesh


 [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
 connection is lost
 ---

 Key: KAFKA-1642
 URL: https://issues.apache.org/jira/browse/KAFKA-1642
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2
Reporter: Bhavesh Mistry
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.2

 Attachments: 
 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
 KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
 KAFKA-1642_2014-10-23_16:19:41.patch


 I see my CPU spike to 100% when network connection is lost for while.  It 
 seems network  IO thread are very busy logging following error message.  Is 
 this expected behavior ?
 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
 producer I/O thread: 
 java.lang.IllegalStateException: No entry found for node -2
 at 
 org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
 at 
 org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
 at 
 org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
 at 
 org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 Thanks,
 Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14223532#comment-14223532
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 11/24/14 9:22 PM:
-

[~ewencp],

Also Regarding KafkaProder.close() method hangs for ever because of following 
loop, and 

{code}
Sender.java

 // okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment,
// wait until these are completed.
while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount()  0) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error(Uncaught error in kafka producer I/O thread: , e);
}
}

KafkaProducer.java

 /**
* Close this producer. This method blocks until all in-flight requests complete.
*/
@Override
public void close() {
log.trace(Closing the Kafka producer.);
this.sender.initiateClose();
try {
this.ioThread.join();  // THIS IS BLOCKED since ioThread does not give up so it 
is all related in my opinion.
} catch (InterruptedException e) {
throw new KafkaException(e);
}
this.metrics.close();
log.debug(The Kafka producer has closed.);
}
{code}

The issue describe in KAFKA-1788  is likelihood, but if you look the close call 
stack then calling thread that initiated the close() will hang till io thread 
dies (which it never dies when data is there and network is down).  

Thanks,

Bhavesh



was (Author: bmis13):
[~ewencp],

Also Regarding KafkaProder.close() method hangs for ever because of following 
loop, and 

{code}
Sender.java

 // okay we stopped accepting requests but there may still be
// requests in the accumulator or waiting for acknowledgment,
// wait until these are completed.
while (this.accumulator.hasUnsent() || this.client.inFlightRequestCount()  0) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error(Uncaught error in kafka producer I/O thread: , e);
}
}

KafkaProducer.java

 /**
* Close this producer. This method blocks until all in-flight requests complete.
*/
@Override
public void close() {
log.trace(Closing the Kafka producer.);
this.sender.initiateClose();
try {
this.ioThread.join();  // THIS IS BLOCKED since ioThread does not give up.
} catch (InterruptedException e) {
throw new KafkaException(e);
}
this.metrics.close();
log.debug(The Kafka producer has closed.);
}
{code}

The issue describe in KAFKA-1788  is likelihood, but if you look the close call 
stack then calling thread that initiated the close() will hang till io thread 
dies (which it never dies when data is there and network is down).  

Thanks,

Bhavesh


 [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
 connection is lost
 ---

 Key: KAFKA-1642
 URL: https://issues.apache.org/jira/browse/KAFKA-1642
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2
Reporter: Bhavesh Mistry
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.2

 Attachments: 
 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
 KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
 KAFKA-1642_2014-10-23_16:19:41.patch


 I see my CPU spike to 100% when network connection is lost for while.  It 
 seems network  IO thread are very busy logging following error message.  Is 
 this expected behavior ?
 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
 producer I/O thread: 
 java.lang.IllegalStateException: No entry found for node -2
 at 
 org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
 at 
 org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
 at 
 org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
 at 
 org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 Thanks,
 Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14223626#comment-14223626
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 11/24/14 10:16 PM:
--

Also, there is issue in my experimental patch.  I did not update the 
lastConnectAttemptMs...in connecting state method to solve the issue with 
illegal sate exp:
{code}
 /**
 * Enter the connecting state for the given node.
 * @param node The id of the node we are connecting to
 * @param now The current time.
 */
public void connecting(int node, long now) {
NodeConnectionState nodeConn = nodeState.get(node); 
if(nodeConn == null){
nodeState.put(node, new 
NodeConnectionState(ConnectionState.CONNECTING, now));
}else{
nodeConn.state = ConnectionState.CONNECTING;
nodeConn.lastConnectAttemptMs = now;  (This will capture and 
update last connection attempt) 

}
}
{code}


was (Author: bmis13):
Also, there is issue in my last patch.  I did not update the 
lastConnectAttemptMs...in connecting.

{code}
 /**
 * Enter the connecting state for the given node.
 * @param node The id of the node we are connecting to
 * @param now The current time.
 */
public void connecting(int node, long now) {
NodeConnectionState nodeConn = nodeState.get(node); 
if(nodeConn == null){
nodeState.put(node, new 
NodeConnectionState(ConnectionState.CONNECTING, now));
}else{
nodeConn.state = ConnectionState.CONNECTING;
nodeConn.lastConnectAttemptMs = now;  (This will capture and 
update last connection attempt) 

}
}
{code}

 [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
 connection is lost
 ---

 Key: KAFKA-1642
 URL: https://issues.apache.org/jira/browse/KAFKA-1642
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2
Reporter: Bhavesh Mistry
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.2

 Attachments: 
 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
 KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
 KAFKA-1642_2014-10-23_16:19:41.patch


 I see my CPU spike to 100% when network connection is lost for while.  It 
 seems network  IO thread are very busy logging following error message.  Is 
 this expected behavior ?
 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
 producer I/O thread: 
 java.lang.IllegalStateException: No entry found for node -2
 at 
 org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
 at 
 org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
 at 
 org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
 at 
 org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 Thanks,
 Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14223779#comment-14223779
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 11/25/14 1:31 AM:
-

[~ewencp],

Thanks for looking into this really appreciate your response. 

Also, do you think rapid connect and disconnect is also due to incorrect Node 
state management ?  connecting method and initiateConnection also ?

Also, Can we also take the defensive coding and have protection in this tight 
infinite loop to throttle CPU cycle if it ends up with start-end duration is 
below some xx ms.  This will actually prevent this issues.We had this issue 
on Prod so I just wanted to highlight the impact of 325% CPU and excessive 
logging. 

Thanks,

Bhavesh 


was (Author: bmis13):
[~ewencp],

Thanks for looking into this really appreciate your response. 

Also, do you think rapid connect and disconnect is also due to incorrect Node 
state management ?  connecting method and initiateConnection also ?

Thanks,

Bhavesh 

 [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
 connection is lost
 ---

 Key: KAFKA-1642
 URL: https://issues.apache.org/jira/browse/KAFKA-1642
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.1.1, 0.8.2
Reporter: Bhavesh Mistry
Assignee: Ewen Cheslack-Postava
Priority: Blocker
 Fix For: 0.8.2

 Attachments: 
 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
 KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
 KAFKA-1642_2014-10-23_16:19:41.patch


 I see my CPU spike to 100% when network connection is lost for while.  It 
 seems network  IO thread are very busy logging following error message.  Is 
 this expected behavior ?
 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
 producer I/O thread: 
 java.lang.IllegalStateException: No entry found for node -2
 at 
 org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
 at 
 org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
 at 
 org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
 at 
 org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 Thanks,
 Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14224041#comment-14224041
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 11/25/14 4:37 AM:
-

[~ewencp],

I hope above steps will give you comprehensive steps to reproduce problems with 
run() method.  It would be really great if we can make the client more 
resilient and  robust so network and brokers instability does not cause CPU 
spikes and degrade application performance. Hence, I would strongly at least 
detect the time run(time) is taking and do based on some configuration, we can 
do CPU Throttling just to be more defensive or at lest detect that io thread is 
taking CPU cycle.

By the way the experimental patch still works for steps describe above as well 
due to hard coded back-off. 

Any time you have patch or any thing, please let me know I will test it out.  
Once thanks for your detail analysis.  

Please look into to ClusterConnectionStates and how it manage the state of node 
when disconnecting immediately . 

please look into  connecting(int node, long now) and this (I feel connecting 
needs to come before not after).
selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), 
this.socketSendBuffer, this.socketReceiveBuffer);
this.connectionStates.connecting(node.id(), now);

Thanks,

Bhavesh  


was (Author: bmis13):
[~ewencp],

I hope above steps will give you comprehensive steps to reproduce problems with 
run() method.  It would be really great if we can make the client more 
resilient and  robust so network and brokers instability does not cause CPU 
spikes and degrade application performance. Hence, I would strongly at least 
detect the time run(time) is taking and do based on some configuration, we can 
do CPU Throttling just to be more defensive or at lest detect that io thread is 
taking CPU cycle.

By the way the experimental patch still works for steps describe above. 


Thanks,

Bhavesh  

 [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
 connection is lost
 ---

 Key: KAFKA-1642
 URL: https://issues.apache.org/jira/browse/KAFKA-1642
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.1.1, 0.8.2
Reporter: Bhavesh Mistry
Assignee: Ewen Cheslack-Postava
Priority: Blocker
 Fix For: 0.8.2

 Attachments: 
 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
 KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
 KAFKA-1642_2014-10-23_16:19:41.patch


 I see my CPU spike to 100% when network connection is lost for while.  It 
 seems network  IO thread are very busy logging following error message.  Is 
 this expected behavior ?
 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
 producer I/O thread: 
 java.lang.IllegalStateException: No entry found for node -2
 at 
 org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
 at 
 org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
 at 
 org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
 at 
 org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 Thanks,
 Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14224041#comment-14224041
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 11/25/14 4:39 AM:
-

[~ewencp],

I hope above steps will give you comprehensive steps to reproduce problems with 
run() method.  It would be really great if we can make the client more 
resilient and  robust so network and brokers instability does not cause CPU 
spikes and degrade application performance. Hence, I would strongly at least 
detect the time run(time) is taking and do some stats based on some 
configuration, we can do CPU Throttling (if need) just to be more defensive or 
at lest detect that io thread is taking CPU cycle.

By the way the experimental patch still works for steps describe above as well 
due to hard coded back-off. 

Any time you have patch or any thing, please let me know I will test it out ( 
you have my email id) .  Once again thanks for your detail analysis and looking 
at this at short notice.  

Please look into to ClusterConnectionStates and how it manage the state of node 
when disconnecting immediately . 

please look into  connecting(int node, long now) and this (I feel connecting 
needs to come before not after).
selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), 
this.socketSendBuffer, this.socketReceiveBuffer);
this.connectionStates.connecting(node.id(), now);

Thanks,

Bhavesh  


was (Author: bmis13):
[~ewencp],

I hope above steps will give you comprehensive steps to reproduce problems with 
run() method.  It would be really great if we can make the client more 
resilient and  robust so network and brokers instability does not cause CPU 
spikes and degrade application performance. Hence, I would strongly at least 
detect the time run(time) is taking and do based on some configuration, we can 
do CPU Throttling just to be more defensive or at lest detect that io thread is 
taking CPU cycle.

By the way the experimental patch still works for steps describe above as well 
due to hard coded back-off. 

Any time you have patch or any thing, please let me know I will test it out.  
Once thanks for your detail analysis.  

Please look into to ClusterConnectionStates and how it manage the state of node 
when disconnecting immediately . 

please look into  connecting(int node, long now) and this (I feel connecting 
needs to come before not after).
selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), 
this.socketSendBuffer, this.socketReceiveBuffer);
this.connectionStates.connecting(node.id(), now);

Thanks,

Bhavesh  

 [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
 connection is lost
 ---

 Key: KAFKA-1642
 URL: https://issues.apache.org/jira/browse/KAFKA-1642
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.1.1, 0.8.2
Reporter: Bhavesh Mistry
Assignee: Ewen Cheslack-Postava
Priority: Blocker
 Fix For: 0.8.2

 Attachments: 
 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
 KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
 KAFKA-1642_2014-10-23_16:19:41.patch


 I see my CPU spike to 100% when network connection is lost for while.  It 
 seems network  IO thread are very busy logging following error message.  Is 
 this expected behavior ?
 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
 producer I/O thread: 
 java.lang.IllegalStateException: No entry found for node -2
 at 
 org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
 at 
 org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
 at 
 org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
 at 
 org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 Thanks,
 Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14224041#comment-14224041
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 11/25/14 4:40 AM:
-

[~ewencp],

I hope above steps will give you comprehensive steps to reproduce problems with 
run() method.  It would be really great if we can make the client more 
resilient and  robust so network and brokers instability does not cause CPU 
spikes and degrade application performance. Hence, I would strongly at least 
detect the time run(time) is taking and do some stats based on some 
configuration, we can do CPU Throttling (if need) just to be more defensive or 
at lest detect that io thread is taking CPU cycle.

By the way the experimental patch still works for steps describe above as well 
due to hard coded back-off. 

Any time you have patch or any thing, please let me know I will test it out ( 
you have my email id) .  Once again thanks for your detail analysis and looking 
at this at short notice.  

Please look into to ClusterConnectionStates and how it manage the state of node 
when disconnecting immediately . 

please look into  connecting(int node, long now) and this (I feel connecting 
needs to come before not after).
selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), 
this.socketSendBuffer, this.socketReceiveBuffer);
this.connectionStates.connecting(node.id(), now);

Also, I still feel that produce.close() is also needs to be looked at (join() 
method with come configuration time out)

Thanks,

Bhavesh  


was (Author: bmis13):
[~ewencp],

I hope above steps will give you comprehensive steps to reproduce problems with 
run() method.  It would be really great if we can make the client more 
resilient and  robust so network and brokers instability does not cause CPU 
spikes and degrade application performance. Hence, I would strongly at least 
detect the time run(time) is taking and do some stats based on some 
configuration, we can do CPU Throttling (if need) just to be more defensive or 
at lest detect that io thread is taking CPU cycle.

By the way the experimental patch still works for steps describe above as well 
due to hard coded back-off. 

Any time you have patch or any thing, please let me know I will test it out ( 
you have my email id) .  Once again thanks for your detail analysis and looking 
at this at short notice.  

Please look into to ClusterConnectionStates and how it manage the state of node 
when disconnecting immediately . 

please look into  connecting(int node, long now) and this (I feel connecting 
needs to come before not after).
selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), 
this.socketSendBuffer, this.socketReceiveBuffer);
this.connectionStates.connecting(node.id(), now);

Thanks,

Bhavesh  

 [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
 connection is lost
 ---

 Key: KAFKA-1642
 URL: https://issues.apache.org/jira/browse/KAFKA-1642
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.1.1, 0.8.2
Reporter: Bhavesh Mistry
Assignee: Ewen Cheslack-Postava
Priority: Blocker
 Fix For: 0.8.2

 Attachments: 
 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
 KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
 KAFKA-1642_2014-10-23_16:19:41.patch


 I see my CPU spike to 100% when network connection is lost for while.  It 
 seems network  IO thread are very busy logging following error message.  Is 
 this expected behavior ?
 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
 producer I/O thread: 
 java.lang.IllegalStateException: No entry found for node -2
 at 
 org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
 at 
 org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
 at 
 org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
 at 
 org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 Thanks,
 Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14224046#comment-14224046
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 11/25/14 4:43 AM:
-

Also, Are you going to port back the patch to 0.8.1.1 version as well ?  Please 
let me know also.

Thanks,
Bhavesh 


was (Author: bmis13):
Also, Are you going to port back the back to 0.8.1.1 version as well ?  Please 
let me know also.

Thanks,
Bhavesh 

 [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
 connection is lost
 ---

 Key: KAFKA-1642
 URL: https://issues.apache.org/jira/browse/KAFKA-1642
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.1.1, 0.8.2
Reporter: Bhavesh Mistry
Assignee: Ewen Cheslack-Postava
Priority: Blocker
 Fix For: 0.8.2

 Attachments: 
 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
 KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
 KAFKA-1642_2014-10-23_16:19:41.patch


 I see my CPU spike to 100% when network connection is lost for while.  It 
 seems network  IO thread are very busy logging following error message.  Is 
 this expected behavior ?
 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
 producer I/O thread: 
 java.lang.IllegalStateException: No entry found for node -2
 at 
 org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
 at 
 org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
 at 
 org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
 at 
 org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 Thanks,
 Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-24 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14224041#comment-14224041
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 11/25/14 5:37 AM:
-

[~ewencp],

I hope above steps will give you comprehensive steps to reproduce problems with 
run() method.  It would be really great if we can make the client more 
resilient and  robust so network and brokers instability does not cause CPU 
spikes and degrade application performance. Hence, I would strongly at least 
detect the time run(time) is taking and do some stats based on some 
configuration, we can do CPU Throttling (if need) just to be more defensive or 
at lest detect that io thread is taking CPU cycle.

By the way the experimental patch still works for steps describe above as well 
due to hard coded back-off. 

Any time you have patch or any thing, please let me know I will test it out ( 
you have my email id) .  Once again thanks for your detail analysis and looking 
at this at short notice.  

Please look into to ClusterConnectionStates and how it manage the state of node 
when disconnecting immediately . 

please look into  connecting(int node, long now) and this (I feel connecting 
needs to come before not after).
selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), 
this.socketSendBuffer, this.socketReceiveBuffer);
this.connectionStates.connecting(node.id(), now);

Also, I still feel that produce.close() is also needs to be looked at (join() 
method with some configuration time out so thread does not hang)

Thanks,

Bhavesh  


was (Author: bmis13):
[~ewencp],

I hope above steps will give you comprehensive steps to reproduce problems with 
run() method.  It would be really great if we can make the client more 
resilient and  robust so network and brokers instability does not cause CPU 
spikes and degrade application performance. Hence, I would strongly at least 
detect the time run(time) is taking and do some stats based on some 
configuration, we can do CPU Throttling (if need) just to be more defensive or 
at lest detect that io thread is taking CPU cycle.

By the way the experimental patch still works for steps describe above as well 
due to hard coded back-off. 

Any time you have patch or any thing, please let me know I will test it out ( 
you have my email id) .  Once again thanks for your detail analysis and looking 
at this at short notice.  

Please look into to ClusterConnectionStates and how it manage the state of node 
when disconnecting immediately . 

please look into  connecting(int node, long now) and this (I feel connecting 
needs to come before not after).
selector.connect(node.id(), new InetSocketAddress(node.host(), node.port()), 
this.socketSendBuffer, this.socketReceiveBuffer);
this.connectionStates.connecting(node.id(), now);

Also, I still feel that produce.close() is also needs to be looked at (join() 
method with come configuration time out)

Thanks,

Bhavesh  

 [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
 connection is lost
 ---

 Key: KAFKA-1642
 URL: https://issues.apache.org/jira/browse/KAFKA-1642
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.1.1, 0.8.2
Reporter: Bhavesh Mistry
Assignee: Ewen Cheslack-Postava
Priority: Blocker
 Fix For: 0.8.2

 Attachments: 
 0001-Initial-CPU-Hish-Usage-by-Kafka-FIX-and-Also-fix-CLO.patch, 
 KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
 KAFKA-1642_2014-10-23_16:19:41.patch


 I see my CPU spike to 100% when network connection is lost for while.  It 
 seems network  IO thread are very busy logging following error message.  Is 
 this expected behavior ?
 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
 producer I/O thread: 
 java.lang.IllegalStateException: No entry found for node -2
 at 
 org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
 at 
 org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
 at 
 org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
 at 
 org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 Thanks,
 Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-11-23 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14222571#comment-14222571
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 11/24/14 1:31 AM:
-

The patch provided does not solve the problem.  When you have more than one or 
more producer instance,  the effect amplifies. 

org.apache.kafka.clients.producer.internals.Send.run() takes 100% CPU due to 
infinite  loop when there is no brokers (no work to be done to dump data).


Thanks,
Bhavesh 


was (Author: bmis13):
The patch provided does not solve the problem.  When you have more than one 
producer instance,  the effect amplifies. 

org.apache.kafka.clients.producer.internals.Send.run() takes 100% CPU due to 
infinite  loop when there is no brokers.


Thanks,
Bhavesh 

 [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
 connection is lost
 ---

 Key: KAFKA-1642
 URL: https://issues.apache.org/jira/browse/KAFKA-1642
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2
Reporter: Bhavesh Mistry
Assignee: Ewen Cheslack-Postava
 Fix For: 0.8.2

 Attachments: KAFKA-1642.patch, KAFKA-1642_2014-10-20_17:33:57.patch, 
 KAFKA-1642_2014-10-23_16:19:41.patch


 I see my CPU spike to 100% when network connection is lost for while.  It 
 seems network  IO thread are very busy logging following error message.  Is 
 this expected behavior ?
 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
 producer I/O thread: 
 java.lang.IllegalStateException: No entry found for node -2
 at 
 org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
 at 
 org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
 at 
 org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
 at 
 org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 Thanks,
 Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-10-13 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169415#comment-14169415
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 10/13/14 4:08 PM:
-

{code TestNetworkDownProducer.java}


import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TestNetworkDownProducer {

static int numberTh = 200;
static CountDownLatch latch = new CountDownLatch(200);
public static void main(String[] args) throws IOException, 
InterruptedException {

Properties prop = new Properties();
InputStream propFile = 
Thread.currentThread().getContextClassLoader()

.getResourceAsStream(kafkaproducer.properties);

String topic = logmon.test;
prop.load(propFile);
System.out.println(Property:  + prop.toString());
StringBuilder builder = new StringBuilder(1024);
int msgLenth = 256;
for (int i = 0; i  msgLenth; i++)
builder.append(a);

int numberOfProducer = 4;
Producer[] producer = new Producer[numberOfProducer];

for (int i = 0; i  producer.length; i++) {
producer[i] = new KafkaProducer(prop);
}
ExecutorService service =   new ThreadPoolExecutor(numberTh, 
numberTh,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueueRunnable(numberTh *2));

for(int i = 0 ; i  numberTh;i++){
service.execute(new 
MyProducer(producer,10,builder.toString(), topic));
}   
latch.await();

System.out.println(All Producers done...!);
for (int i = 0; i  producer.length; i++) {
producer[i].close();
}   
service.shutdownNow();
System.out.println(All done...!);

}



static class MyProducer implements Runnable {

Producer[] producer;
long maxloops;
String msg ;
String topic;

MyProducer(Producer[] list, long maxloops,String msg,String 
topic){
this.producer = list;
this.maxloops = maxloops;
this.msg = msg;
this.topic = topic;
}
public void run() {
ProducerRecord record = new ProducerRecord(topic, 
msg.toString().getBytes());
Callback  callBack = new  MyCallback();
try{
for(long j=0 ; j  maxloops ; j++){
try {
for (int i = 0; i  
producer.length; i++) {

producer[i].send(record, callBack);
}
Thread.sleep(10);
} catch (Throwable th) {
System.err.println(FATAL );
th.printStackTrace();
}
}

}finally {
latch.countDown();
}   
}
}   

static class MyCallback implements Callback {
public void onCompletion(RecordMetadata metadata, Exception 
exception) {
if(exception != null){
System.err.println(Msg dropped..!);
exception.printStackTrace();
}

}
}

}
{code}

{code: kafkaproducer.properties }
# THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at 
https://kafka.apache.org/documentation.html#newproducerconfigs
# Broker List
bootstrap.servers= BROKERS HERE...
#Data Acks
acks=1
# 64MB of 

[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-10-13 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169415#comment-14169415
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 10/13/14 4:09 PM:
-

{code}


import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TestNetworkDownProducer {

static int numberTh = 200;
static CountDownLatch latch = new CountDownLatch(200);
public static void main(String[] args) throws IOException, 
InterruptedException {

Properties prop = new Properties();
InputStream propFile = 
Thread.currentThread().getContextClassLoader()

.getResourceAsStream(kafkaproducer.properties);

String topic = test;
prop.load(propFile);
System.out.println(Property:  + prop.toString());
StringBuilder builder = new StringBuilder(1024);
int msgLenth = 256;
for (int i = 0; i  msgLenth; i++)
builder.append(a);

int numberOfProducer = 4;
Producer[] producer = new Producer[numberOfProducer];

for (int i = 0; i  producer.length; i++) {
producer[i] = new KafkaProducer(prop);
}
ExecutorService service =   new ThreadPoolExecutor(numberTh, 
numberTh,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueueRunnable(numberTh *2));

for(int i = 0 ; i  numberTh;i++){
service.execute(new 
MyProducer(producer,10,builder.toString(), topic));
}   
latch.await();

System.out.println(All Producers done...!);
for (int i = 0; i  producer.length; i++) {
producer[i].close();
}   
service.shutdownNow();
System.out.println(All done...!);

}



static class MyProducer implements Runnable {

Producer[] producer;
long maxloops;
String msg ;
String topic;

MyProducer(Producer[] list, long maxloops,String msg,String 
topic){
this.producer = list;
this.maxloops = maxloops;
this.msg = msg;
this.topic = topic;
}
public void run() {
ProducerRecord record = new ProducerRecord(topic, 
msg.toString().getBytes());
Callback  callBack = new  MyCallback();
try{
for(long j=0 ; j  maxloops ; j++){
try {
for (int i = 0; i  
producer.length; i++) {

producer[i].send(record, callBack);
}
Thread.sleep(10);
} catch (Throwable th) {
System.err.println(FATAL );
th.printStackTrace();
}
}

}finally {
latch.countDown();
}   
}
}   

static class MyCallback implements Callback {
public void onCompletion(RecordMetadata metadata, Exception 
exception) {
if(exception != null){
System.err.println(Msg dropped..!);
exception.printStackTrace();
}

}
}

}
{code}

Property File
{code }
# THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at 
https://kafka.apache.org/documentation.html#newproducerconfigs
# Broker List
bootstrap.servers= BROKERS HERE...
#Data Acks
acks=1
# 64MB of Buffer for log lines (including all messages).

[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-10-13 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169415#comment-14169415
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 10/13/14 4:09 PM:
-

{code}


import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TestNetworkDownProducer {

static int numberTh = 200;
static CountDownLatch latch = new CountDownLatch(200);
public static void main(String[] args) throws IOException, 
InterruptedException {

Properties prop = new Properties();
InputStream propFile = 
Thread.currentThread().getContextClassLoader()

.getResourceAsStream(kafkaproducer.properties);

String topic = test;
prop.load(propFile);
System.out.println(Property:  + prop.toString());
StringBuilder builder = new StringBuilder(1024);
int msgLenth = 256;
for (int i = 0; i  msgLenth; i++)
builder.append(a);

int numberOfProducer = 4;
Producer[] producer = new Producer[numberOfProducer];

for (int i = 0; i  producer.length; i++) {
producer[i] = new KafkaProducer(prop);
}
ExecutorService service =   new ThreadPoolExecutor(numberTh, 
numberTh,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueueRunnable(numberTh *2));

for(int i = 0 ; i  numberTh;i++){
service.execute(new 
MyProducer(producer,10,builder.toString(), topic));
}   
latch.await();

System.out.println(All Producers done...!);
for (int i = 0; i  producer.length; i++) {
producer[i].close();
}   
service.shutdownNow();
System.out.println(All done...!);

}



static class MyProducer implements Runnable {

Producer[] producer;
long maxloops;
String msg ;
String topic;

MyProducer(Producer[] list, long maxloops,String msg,String 
topic){
this.producer = list;
this.maxloops = maxloops;
this.msg = msg;
this.topic = topic;
}
public void run() {
ProducerRecord record = new ProducerRecord(topic, 
msg.toString().getBytes());
Callback  callBack = new  MyCallback();
try{
for(long j=0 ; j  maxloops ; j++){
try {
for (int i = 0; i  
producer.length; i++) {

producer[i].send(record, callBack);
}
Thread.sleep(10);
} catch (Throwable th) {
System.err.println(FATAL );
th.printStackTrace();
}
}

}finally {
latch.countDown();
}   
}
}   

static class MyCallback implements Callback {
public void onCompletion(RecordMetadata metadata, Exception 
exception) {
if(exception != null){
System.err.println(Msg dropped..!);
exception.printStackTrace();
}

}
}

}
{code}


{code }
# THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at 
https://kafka.apache.org/documentation.html#newproducerconfigs
# Broker List
bootstrap.servers= BROKERS HERE...
#Data Acks
acks=1
# 64MB of Buffer for log lines (including all messages).

[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-10-13 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169415#comment-14169415
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 10/13/14 4:10 PM:
-

{code}


import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TestNetworkDownProducer {

static int numberTh = 200;
static CountDownLatch latch = new CountDownLatch(200);
public static void main(String[] args) throws IOException, 
InterruptedException {

Properties prop = new Properties();
InputStream propFile = 
Thread.currentThread().getContextClassLoader()

.getResourceAsStream(kafkaproducer.properties);

String topic = test;
prop.load(propFile);
System.out.println(Property:  + prop.toString());
StringBuilder builder = new StringBuilder(1024);
int msgLenth = 256;
for (int i = 0; i  msgLenth; i++)
builder.append(a);

int numberOfProducer = 4;
Producer[] producer = new Producer[numberOfProducer];

for (int i = 0; i  producer.length; i++) {
producer[i] = new KafkaProducer(prop);
}
ExecutorService service =   new ThreadPoolExecutor(numberTh, 
numberTh,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueueRunnable(numberTh *2));

for(int i = 0 ; i  numberTh;i++){
service.execute(new 
MyProducer(producer,10,builder.toString(), topic));
}   
latch.await();

System.out.println(All Producers done...!);
for (int i = 0; i  producer.length; i++) {
producer[i].close();
}   
service.shutdownNow();
System.out.println(All done...!);

}



static class MyProducer implements Runnable {

Producer[] producer;
long maxloops;
String msg ;
String topic;

MyProducer(Producer[] list, long maxloops,String msg,String 
topic){
this.producer = list;
this.maxloops = maxloops;
this.msg = msg;
this.topic = topic;
}
public void run() {
ProducerRecord record = new ProducerRecord(topic, 
msg.toString().getBytes());
Callback  callBack = new  MyCallback();
try{
for(long j=0 ; j  maxloops ; j++){
try {
for (int i = 0; i  
producer.length; i++) {

producer[i].send(record, callBack);
}
Thread.sleep(10);
} catch (Throwable th) {
System.err.println(FATAL );
th.printStackTrace();
}
}

}finally {
latch.countDown();
}   
}
}   

static class MyCallback implements Callback {
public void onCompletion(RecordMetadata metadata, Exception 
exception) {
if(exception != null){
System.err.println(Msg dropped..!);
exception.printStackTrace();
}

}
}

}
{code}

This is property file used:
{code }
# THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at 
https://kafka.apache.org/documentation.html#newproducerconfigs
# Broker List
bootstrap.servers= BROKERS HERE...
#Data Acks
acks=1
# 64MB of Buffer for log lines (including all 

[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-10-13 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169415#comment-14169415
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 10/13/14 4:11 PM:
-

{code}


import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TestNetworkDownProducer {

static int numberTh = 200;
static CountDownLatch latch = new CountDownLatch(200);
public static void main(String[] args) throws IOException, 
InterruptedException {

Properties prop = new Properties();
InputStream propFile = 
Thread.currentThread().getContextClassLoader()

.getResourceAsStream(kafkaproducer.properties);

String topic = test;
prop.load(propFile);
System.out.println(Property:  + prop.toString());
StringBuilder builder = new StringBuilder(1024);
int msgLenth = 256;
for (int i = 0; i  msgLenth; i++)
builder.append(a);

int numberOfProducer = 4;
Producer[] producer = new Producer[numberOfProducer];

for (int i = 0; i  producer.length; i++) {
producer[i] = new KafkaProducer(prop);
}
ExecutorService service =   new ThreadPoolExecutor(numberTh, 
numberTh,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueueRunnable(numberTh *2));

for(int i = 0 ; i  numberTh;i++){
service.execute(new 
MyProducer(producer,10,builder.toString(), topic));
}   
latch.await();

System.out.println(All Producers done...!);
for (int i = 0; i  producer.length; i++) {
producer[i].close();
}   
service.shutdownNow();
System.out.println(All done...!);

}



static class MyProducer implements Runnable {

Producer[] producer;
long maxloops;
String msg ;
String topic;

MyProducer(Producer[] list, long maxloops,String msg,String 
topic){
this.producer = list;
this.maxloops = maxloops;
this.msg = msg;
this.topic = topic;
}
public void run() {
ProducerRecord record = new ProducerRecord(topic, 
msg.toString().getBytes());
Callback  callBack = new  MyCallback();
try{
for(long j=0 ; j  maxloops ; j++){
try {
for (int i = 0; i  
producer.length; i++) {

producer[i].send(record, callBack);
}
Thread.sleep(10);
} catch (Throwable th) {
System.err.println(FATAL );
th.printStackTrace();
}
}

}finally {
latch.countDown();
}   
}
}   

static class MyCallback implements Callback {
public void onCompletion(RecordMetadata metadata, Exception 
exception) {
if(exception != null){
System.err.println(Msg dropped..!);
exception.printStackTrace();
}

}
}

}
{code}

This is property file used:
{code}
# THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at 
https://kafka.apache.org/documentation.html#newproducerconfigs
# Broker List
bootstrap.servers= BROKERS HERE...
#Data Acks
acks=1
# 64MB of Buffer for log lines (including all 

[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-10-13 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14169415#comment-14169415
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 10/13/14 5:05 PM:
-

{code}


import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class TestNetworkDownProducer {

static int numberTh = 200;
static CountDownLatch latch = new CountDownLatch(200);
public static void main(String[] args) throws IOException, 
InterruptedException {

Properties prop = new Properties();
InputStream propFile = 
Thread.currentThread().getContextClassLoader()

.getResourceAsStream(kafkaproducer.properties);

String topic = test;
prop.load(propFile);
System.out.println(Property:  + prop.toString());
StringBuilder builder = new StringBuilder(1024);
int msgLenth = 256;
for (int i = 0; i  msgLenth; i++)
builder.append(a);

int numberOfProducer = 4;
Producer[] producer = new Producer[numberOfProducer];

for (int i = 0; i  producer.length; i++) {
producer[i] = new KafkaProducer(prop);
}
ExecutorService service =   new ThreadPoolExecutor(numberTh, 
numberTh,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueueRunnable(numberTh *2));

for(int i = 0 ; i  numberTh;i++){
service.execute(new 
MyProducer(producer,10,builder.toString(), topic));
}   
latch.await();

System.out.println(All Producers done...!);
for (int i = 0; i  producer.length; i++) {
producer[i].close();
}   
service.shutdownNow();
System.out.println(All done...!);

}



static class MyProducer implements Runnable {

Producer[] producer;
long maxloops;
String msg ;
String topic;

MyProducer(Producer[] list, long maxloops,String msg,String 
topic){
this.producer = list;
this.maxloops = maxloops;
this.msg = msg;
this.topic = topic;
}
public void run() {
ProducerRecord record = new ProducerRecord(topic, 
msg.toString().getBytes());
Callback  callBack = new  MyCallback();
try{
for(long j=0 ; j  maxloops ; j++){
try {
for (int i = 0; i  
producer.length; i++) {

producer[i].send(record, callBack);
}
Thread.sleep(10);
} catch (Throwable th) {
System.err.println(FATAL );
th.printStackTrace();
}
}

}finally {
latch.countDown();
}   
}
}   

static class MyCallback implements Callback {
public void onCompletion(RecordMetadata metadata, Exception 
exception) {
if(exception != null){
System.err.println(Msg dropped..!);
exception.printStackTrace();
}

}
}

}
{code}

This is property file used:
{code}
# THIS IS FOR NEW PRODUCERS API TRUNK Please see the configuration at 
https://kafka.apache.org/documentation.html#newproducerconfigs
# Broker List
bootstrap.servers= BROKERS HERE...
#Data Acks
acks=1
# 64MB of Buffer for log lines (including all 

[jira] [Comment Edited] (KAFKA-1642) [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost

2014-09-25 Thread Bhavesh Mistry (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14148121#comment-14148121
 ] 

Bhavesh Mistry edited comment on KAFKA-1642 at 9/25/14 6:42 PM:


HI [~jkreps],

I will work on the sample program. We are not setting reconnect.backoff.ms and 
retry.backoff.ms configuration so it would be default configuration.  Only 
thing I can tell you is that I have 4 Producer instances per JVM.  So this 
might amplify issue. 

Thanks,

Bhavesh 


was (Author: bmis13):
HI [~jkreps],

I will work on the sample program. We are not setting reconnect.backoff.ms and 
retry.backoff.ms configuration so it would be default configuration.  Only 
thing I can tell you is that I have 4 Producer instance per JVM.  So this might 
amplify issue. 

Thanks,

Bhavesh 

 [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network 
 connection is lost
 ---

 Key: KAFKA-1642
 URL: https://issues.apache.org/jira/browse/KAFKA-1642
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 0.8.2
Reporter: Bhavesh Mistry
Assignee: Jun Rao

 I see my CPU spike to 100% when network connection is lost for while.  It 
 seems network  IO thread are very busy logging following error message.  Is 
 this expected behavior ?
 2014-09-17 14:06:16.830 [kafka-producer-network-thread] ERROR 
 org.apache.kafka.clients.producer.internals.Sender - Uncaught error in kafka 
 producer I/O thread: 
 java.lang.IllegalStateException: No entry found for node -2
 at 
 org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:110)
 at 
 org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:99)
 at 
 org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:394)
 at 
 org.apache.kafka.clients.NetworkClient.maybeUpdateMetadata(NetworkClient.java:380)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:174)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175)
 at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
 at java.lang.Thread.run(Thread.java:744)
 Thanks,
 Bhavesh



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)