[jira] [Commented] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-13 Thread Vishal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14209628#comment-14209628
 ] 

Vishal commented on KAFKA-1745:
---

Any solutions?

 Each new thread creates a PIPE and KQUEUE as open files during 
 producer.send() and does not get cleared when the thread that creates them is 
 cleared.
 -

 Key: KAFKA-1745
 URL: https://issues.apache.org/jira/browse/KAFKA-1745
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
 Environment: Mac OS Mavericks
Reporter: Vishal
Priority: Critical

 Hi,
 I'm using the java client API for Kafka. I wanted to send data to Kafka 
 by using a producer pool as I'm using a sync producer. The thread that sends 
 the data is from the thread pool that grows and shrinks depending on the 
 usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are 
 created (got this info by using lsof). If I keep using the same thread it's 
 fine but when a new thread sends data to Kafka (using producer.send() ) a new 
 KQUEUE and 2 PIPEs are created.
 This is okay, but when the thread is cleared from the thread pool and a new 
 thread is created, then new KQUEUEs and PIPEs are created. The problem is 
 that the old ones which were created are not getting destroyed and they are 
 showing up as open files. This is causing a major problem as the number of 
 open file keep increasing and does not decrease.
 Please suggest any solutions.
 FYI, the number of TCP connections established from the producer system to 
 the Kafka Broker remain constant throughout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-06 Thread Vishal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199817#comment-14199817
 ] 

Vishal edited comment on KAFKA-1745 at 11/6/14 8:07 AM:


[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static QueueProducerString, String producerPool = new 
ConcurrentLinkedQueueProducerString,String();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put(metadata.broker.list, IP:Port);
props.put(serializer.class, kafka.serializer.StringEncoder);
props.put(request.required.acks, 1);
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueueRunnable());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
ProducerString, String producer = 
producerPool.poll();
if(producer == null)
{
producer = new ProducerString, 
String(config);
}
KeyedMessageString, String data = new 
KeyedMessageString, String(SaSMQ, 0, test);
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that keepalive 
time is exceeded by the thread pool and threads are cleared. 
}
}

}
{code}


Though KQUEUE and PIPE does get cleared after some time (in this test case, it 
was after 1-2 minutes), why does it have to create a new one when a new thread 
accesses the producer object? 


was (Author: vishal m):
[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static QueueProducerString, String producerPool = new 
ConcurrentLinkedQueueProducerString,String();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put(metadata.broker.list, IP:Port);
props.put(serializer.class, kafka.serializer.StringEncoder);
props.put(request.required.acks, 1);
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueueRunnable());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
ProducerString, String producer = 
producerPool.poll();
if(producer == null)
{
producer = new ProducerString, 
String(config);
}
KeyedMessageString, String data = new 
KeyedMessageString, String(SaSMQ, 0, test);
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that 

[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-06 Thread Vishal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199817#comment-14199817
 ] 

Vishal edited comment on KAFKA-1745 at 11/6/14 12:00 PM:
-

[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static QueueProducerString, String producerPool = new 
ConcurrentLinkedQueueProducerString,String();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put(metadata.broker.list, IP:Port);
props.put(serializer.class, kafka.serializer.StringEncoder);
props.put(request.required.acks, 1);
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueueRunnable());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
ProducerString, String producer = 
producerPool.poll();
if(producer == null)
{
producer = new ProducerString, 
String(config);
}
KeyedMessageString, String data = new 
KeyedMessageString, String(SaSMQ, 0, test);
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that keepalive 
time is exceeded by the thread pool and threads are cleared. 
}
}

}
{code}


Though KQUEUE and PIPE does get cleared after some time (in this test case, it 
was after 1-2 minutes), why does kafka's api create a new one when a new thread 
accesses the producer object? 


was (Author: vishal m):
[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static QueueProducerString, String producerPool = new 
ConcurrentLinkedQueueProducerString,String();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put(metadata.broker.list, IP:Port);
props.put(serializer.class, kafka.serializer.StringEncoder);
props.put(request.required.acks, 1);
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueueRunnable());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
ProducerString, String producer = 
producerPool.poll();
if(producer == null)
{
producer = new ProducerString, 
String(config);
}
KeyedMessageString, String data = new 
KeyedMessageString, String(SaSMQ, 0, test);
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So 

[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-06 Thread Vishal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199817#comment-14199817
 ] 

Vishal edited comment on KAFKA-1745 at 11/6/14 12:04 PM:
-

[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static QueueProducerString, String producerPool = new 
ConcurrentLinkedQueueProducerString,String();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put(metadata.broker.list, IP:Port);
props.put(serializer.class, kafka.serializer.StringEncoder);
props.put(request.required.acks, 1);
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueueRunnable());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
ProducerString, String producer = 
producerPool.poll();
if(producer == null)
{
producer = new ProducerString, 
String(config);
}
KeyedMessageString, String data = new 
KeyedMessageString, String(SaSMQ, 0, test);
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that the keep 
alive time set on the thread pool will be exceeded and the threads in the 
threadpool will be cleared after each iteration. 
}
}

}
{code}


Though KQUEUE and PIPE does get cleared after some time (in this test case, it 
was after 1-2 minutes), why does Kafka's api create a new one when a new thread 
accesses the producer object? 


was (Author: vishal m):
[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static QueueProducerString, String producerPool = new 
ConcurrentLinkedQueueProducerString,String();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put(metadata.broker.list, IP:Port);
props.put(serializer.class, kafka.serializer.StringEncoder);
props.put(request.required.acks, 1);
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueueRunnable());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
ProducerString, String producer = 
producerPool.poll();
if(producer == null)
{
producer = new ProducerString, 
String(config);
}
KeyedMessageString, String data = new 
KeyedMessageString, String(SaSMQ, 0, test);
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i100; i++)
{
tpe.submit(run);
}
  

[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-06 Thread Vishal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199817#comment-14199817
 ] 

Vishal edited comment on KAFKA-1745 at 11/6/14 12:03 PM:
-

[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static QueueProducerString, String producerPool = new 
ConcurrentLinkedQueueProducerString,String();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put(metadata.broker.list, IP:Port);
props.put(serializer.class, kafka.serializer.StringEncoder);
props.put(request.required.acks, 1);
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueueRunnable());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
ProducerString, String producer = 
producerPool.poll();
if(producer == null)
{
producer = new ProducerString, 
String(config);
}
KeyedMessageString, String data = new 
KeyedMessageString, String(SaSMQ, 0, test);
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that the keep 
alive time set on the thread pool will be exceeded and the threads in the 
threadpool will be cleared. 
}
}

}
{code}


Though KQUEUE and PIPE does get cleared after some time (in this test case, it 
was after 1-2 minutes), why does Kafka's api create a new one when a new thread 
accesses the producer object? 


was (Author: vishal m):
[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static QueueProducerString, String producerPool = new 
ConcurrentLinkedQueueProducerString,String();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put(metadata.broker.list, IP:Port);
props.put(serializer.class, kafka.serializer.StringEncoder);
props.put(request.required.acks, 1);
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueueRunnable());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
ProducerString, String producer = 
producerPool.poll();
if(producer == null)
{
producer = new ProducerString, 
String(config);
}
KeyedMessageString, String data = new 
KeyedMessageString, String(SaSMQ, 0, test);
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i100; i++)
{
tpe.submit(run);
}

[jira] [Commented] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-05 Thread Vishal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14197908#comment-14197908
 ] 

Vishal commented on KAFKA-1745:
---

No, since I figured that calling producer.close() and returning that producer 
object to the pool would make that producer object unusable afterwards.

 Each new thread creates a PIPE and KQUEUE as open files during 
 producer.send() and does not get cleared when the thread that creates them is 
 cleared.
 -

 Key: KAFKA-1745
 URL: https://issues.apache.org/jira/browse/KAFKA-1745
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
 Environment: Mac OS Mavericks
Reporter: Vishal
Priority: Critical

 Hi,
 I'm using the java client API for Kafka. I wanted to send data to Kafka 
 by using a producer pool as I'm using a sync producer. The thread that sends 
 the data is from the thread pool that grows and shrinks depending on the 
 usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are 
 created (got this info by using lsof). If I keep using the same thread it's 
 fine but when a new thread sends data to Kafka (using producer.send() ) a new 
 KQUEUE and 2 PIPEs are created.
 This is okay, but when the thread is cleared from the thread pool and a new 
 thread is created, then new KQUEUEs and PIPEs are created. The problem is 
 that the old ones which were created are not getting destroyed and they are 
 showing up as open files. This is causing a major problem as the number of 
 open file keep increasing and does not decrease.
 Please suggest any solutions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-05 Thread Vishal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14197908#comment-14197908
 ] 

Vishal edited comment on KAFKA-1745 at 11/5/14 8:56 AM:


No, since I figured that calling producer.close() and returning that producer 
object to the pool would make that producer object unusable afterwards.


was (Author: vishal m):
No, since I figured that calling producer.close() and returning that producer 
object to the pool would make that producer object unusable afterwards.

 Each new thread creates a PIPE and KQUEUE as open files during 
 producer.send() and does not get cleared when the thread that creates them is 
 cleared.
 -

 Key: KAFKA-1745
 URL: https://issues.apache.org/jira/browse/KAFKA-1745
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
 Environment: Mac OS Mavericks
Reporter: Vishal
Priority: Critical

 Hi,
 I'm using the java client API for Kafka. I wanted to send data to Kafka 
 by using a producer pool as I'm using a sync producer. The thread that sends 
 the data is from the thread pool that grows and shrinks depending on the 
 usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are 
 created (got this info by using lsof). If I keep using the same thread it's 
 fine but when a new thread sends data to Kafka (using producer.send() ) a new 
 KQUEUE and 2 PIPEs are created.
 This is okay, but when the thread is cleared from the thread pool and a new 
 thread is created, then new KQUEUEs and PIPEs are created. The problem is 
 that the old ones which were created are not getting destroyed and they are 
 showing up as open files. This is causing a major problem as the number of 
 open file keep increasing and does not decrease.
 Please suggest any solutions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-05 Thread Vishal (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vishal updated KAFKA-1745:
--
Description: 
Hi,
I'm using the java client API for Kafka. I wanted to send data to Kafka by 
using a producer pool as I'm using a sync producer. The thread that sends the 
data is from the thread pool that grows and shrinks depending on the usage. So, 
when I try to send data from one thread, 1 KQUEUE and 2 PIPES are created (got 
this info by using lsof). If I keep using the same thread it's fine but when a 
new thread sends data to Kafka (using producer.send() ) a new KQUEUE and 2 
PIPEs are created.

This is okay, but when the thread is cleared from the thread pool and a new 
thread is created, then new KQUEUEs and PIPEs are created. The problem is that 
the old ones which were created are not getting destroyed and they are showing 
up as open files. This is causing a major problem as the number of open file 
keep increasing and does not decrease.

Please suggest any solutions.

FYI, the number of TCP connections established remain constant throughout.

  was:
Hi,
I'm using the java client API for Kafka. I wanted to send data to Kafka by 
using a producer pool as I'm using a sync producer. The thread that sends the 
data is from the thread pool that grows and shrinks depending on the usage. So, 
when I try to send data from one thread, 1 KQUEUE and 2 PIPES are created (got 
this info by using lsof). If I keep using the same thread it's fine but when a 
new thread sends data to Kafka (using producer.send() ) a new KQUEUE and 2 
PIPEs are created.

This is okay, but when the thread is cleared from the thread pool and a new 
thread is created, then new KQUEUEs and PIPEs are created. The problem is that 
the old ones which were created are not getting destroyed and they are showing 
up as open files. This is causing a major problem as the number of open file 
keep increasing and does not decrease.

Please suggest any solutions.


 Each new thread creates a PIPE and KQUEUE as open files during 
 producer.send() and does not get cleared when the thread that creates them is 
 cleared.
 -

 Key: KAFKA-1745
 URL: https://issues.apache.org/jira/browse/KAFKA-1745
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
 Environment: Mac OS Mavericks
Reporter: Vishal
Priority: Critical

 Hi,
 I'm using the java client API for Kafka. I wanted to send data to Kafka 
 by using a producer pool as I'm using a sync producer. The thread that sends 
 the data is from the thread pool that grows and shrinks depending on the 
 usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are 
 created (got this info by using lsof). If I keep using the same thread it's 
 fine but when a new thread sends data to Kafka (using producer.send() ) a new 
 KQUEUE and 2 PIPEs are created.
 This is okay, but when the thread is cleared from the thread pool and a new 
 thread is created, then new KQUEUEs and PIPEs are created. The problem is 
 that the old ones which were created are not getting destroyed and they are 
 showing up as open files. This is causing a major problem as the number of 
 open file keep increasing and does not decrease.
 Please suggest any solutions.
 FYI, the number of TCP connections established remain constant throughout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-05 Thread Vishal (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vishal updated KAFKA-1745:
--
Description: 
Hi,
I'm using the java client API for Kafka. I wanted to send data to Kafka by 
using a producer pool as I'm using a sync producer. The thread that sends the 
data is from the thread pool that grows and shrinks depending on the usage. So, 
when I try to send data from one thread, 1 KQUEUE and 2 PIPES are created (got 
this info by using lsof). If I keep using the same thread it's fine but when a 
new thread sends data to Kafka (using producer.send() ) a new KQUEUE and 2 
PIPEs are created.

This is okay, but when the thread is cleared from the thread pool and a new 
thread is created, then new KQUEUEs and PIPEs are created. The problem is that 
the old ones which were created are not getting destroyed and they are showing 
up as open files. This is causing a major problem as the number of open file 
keep increasing and does not decrease.

Please suggest any solutions.

FYI, the number of TCP connections established from the producer system to the 
Kafka Broker remain constant throughout.

  was:
Hi,
I'm using the java client API for Kafka. I wanted to send data to Kafka by 
using a producer pool as I'm using a sync producer. The thread that sends the 
data is from the thread pool that grows and shrinks depending on the usage. So, 
when I try to send data from one thread, 1 KQUEUE and 2 PIPES are created (got 
this info by using lsof). If I keep using the same thread it's fine but when a 
new thread sends data to Kafka (using producer.send() ) a new KQUEUE and 2 
PIPEs are created.

This is okay, but when the thread is cleared from the thread pool and a new 
thread is created, then new KQUEUEs and PIPEs are created. The problem is that 
the old ones which were created are not getting destroyed and they are showing 
up as open files. This is causing a major problem as the number of open file 
keep increasing and does not decrease.

Please suggest any solutions.

FYI, the number of TCP connections established remain constant throughout.


 Each new thread creates a PIPE and KQUEUE as open files during 
 producer.send() and does not get cleared when the thread that creates them is 
 cleared.
 -

 Key: KAFKA-1745
 URL: https://issues.apache.org/jira/browse/KAFKA-1745
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
 Environment: Mac OS Mavericks
Reporter: Vishal
Priority: Critical

 Hi,
 I'm using the java client API for Kafka. I wanted to send data to Kafka 
 by using a producer pool as I'm using a sync producer. The thread that sends 
 the data is from the thread pool that grows and shrinks depending on the 
 usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are 
 created (got this info by using lsof). If I keep using the same thread it's 
 fine but when a new thread sends data to Kafka (using producer.send() ) a new 
 KQUEUE and 2 PIPEs are created.
 This is okay, but when the thread is cleared from the thread pool and a new 
 thread is created, then new KQUEUEs and PIPEs are created. The problem is 
 that the old ones which were created are not getting destroyed and they are 
 showing up as open files. This is causing a major problem as the number of 
 open file keep increasing and does not decrease.
 Please suggest any solutions.
 FYI, the number of TCP connections established from the producer system to 
 the Kafka Broker remain constant throughout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-05 Thread Vishal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199817#comment-14199817
 ] 

Vishal commented on KAFKA-1745:
---

[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static QueueProducerString, String producerPool = new 
ConcurrentLinkedQueueProducerString,String();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put(metadata.broker.list, IP:Port);
props.put(serializer.class, kafka.serializer.StringEncoder);
props.put(request.required.acks, 1);
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueueRunnable());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
ProducerString, String producer = 
producerPool.poll();
if(producer == null)
{
producer = new ProducerString, 
String(config);
}
KeyedMessageString, String data = new 
KeyedMessageString, String(SaSMQ, 0, test);
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that keepalive 
time is exceeded by the thread pool and threads are cleared. 
}
}

}
{code}

 Each new thread creates a PIPE and KQUEUE as open files during 
 producer.send() and does not get cleared when the thread that creates them is 
 cleared.
 -

 Key: KAFKA-1745
 URL: https://issues.apache.org/jira/browse/KAFKA-1745
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
 Environment: Mac OS Mavericks
Reporter: Vishal
Priority: Critical

 Hi,
 I'm using the java client API for Kafka. I wanted to send data to Kafka 
 by using a producer pool as I'm using a sync producer. The thread that sends 
 the data is from the thread pool that grows and shrinks depending on the 
 usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are 
 created (got this info by using lsof). If I keep using the same thread it's 
 fine but when a new thread sends data to Kafka (using producer.send() ) a new 
 KQUEUE and 2 PIPEs are created.
 This is okay, but when the thread is cleared from the thread pool and a new 
 thread is created, then new KQUEUEs and PIPEs are created. The problem is 
 that the old ones which were created are not getting destroyed and they are 
 showing up as open files. This is causing a major problem as the number of 
 open file keep increasing and does not decrease.
 Please suggest any solutions.
 FYI, the number of TCP connections established from the producer system to 
 the Kafka Broker remain constant throughout.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-05 Thread Vishal (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14199817#comment-14199817
 ] 

Vishal edited comment on KAFKA-1745 at 11/6/14 5:25 AM:


[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static QueueProducerString, String producerPool = new 
ConcurrentLinkedQueueProducerString,String();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put(metadata.broker.list, IP:Port);
props.put(serializer.class, kafka.serializer.StringEncoder);
props.put(request.required.acks, 1);
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueueRunnable());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
ProducerString, String producer = 
producerPool.poll();
if(producer == null)
{
producer = new ProducerString, 
String(config);
}
KeyedMessageString, String data = new 
KeyedMessageString, String(SaSMQ, 0, test);
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that keepalive 
time is exceeded by the thread pool and threads are cleared. 
}
}

}
{code}


Though KQUEUE and PIPE does get cleared after some time, why does it have to 
create a new one when a new thread accesses the producer object? 


was (Author: vishal m):
[~junrao] Sorry, I should have been more clear.
[~ewencp] I checked and I don't think I'm leaking producers. Anyway, this is a 
sample test case which can replicate the problem (check lsof every 10 seconds 
to notice the increase in KQUEUEs and PIPESs though the producer object is 
being reused):

{code}
public class Test {

private static QueueProducerString, String producerPool = new 
ConcurrentLinkedQueueProducerString,String();

private static ProducerConfig config;

static
{
Properties props = new Properties();
props.put(metadata.broker.list, IP:Port);
props.put(serializer.class, kafka.serializer.StringEncoder);
props.put(request.required.acks, 1);
config = new ProducerConfig(props);
}

public static void main(String[] args) throws InterruptedException {

ThreadPoolExecutor tpe = new ThreadPoolExecutor(10, 10, 5, 
TimeUnit.SECONDS, new LinkedBlockingQueueRunnable());
tpe.allowCoreThreadTimeOut(true);
Thread run = new Thread(new Runnable() {

@Override
public void run() {
ProducerString, String producer = 
producerPool.poll();
if(producer == null)
{
producer = new ProducerString, 
String(config);
}
KeyedMessageString, String data = new 
KeyedMessageString, String(SaSMQ, 0, test);
producer.send(data);
producerPool.add(producer);
}
});

while(true) //To make sure that the main program does not 
terminate
{
for(int i = 0;i100; i++)
{
tpe.submit(run);
}
Thread.sleep(1); //10 seconds So that keepalive 
time is exceeded by the thread pool 

[jira] [Created] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does no0t get cleared when the thread that creates them is cleared.

2014-11-03 Thread Vishal (JIRA)
Vishal created KAFKA-1745:
-

 Summary: Each new thread creates a PIPE and KQUEUE as open files 
during producer.send() and does no0t get cleared when the thread that creates 
them is cleared.
 Key: KAFKA-1745
 URL: https://issues.apache.org/jira/browse/KAFKA-1745
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
 Environment: Mac OS Mavericks
Reporter: Vishal
Priority: Critical


Hi,
I'm using the java client API for Kafka. I wanted to send data to Kafka by 
using a producer pool as I'm using a sync producer. The thread that sends the 
data is from the thread pool that grows and shrinks depending on the usage. So, 
when I try to send data from one thread, 1 KQUEUE and 2 PIPES are created (got 
this info by using lsof). If I keep using the same thread it's fine but when a 
new thread sends data to Kafka (using producer.send() ) a new KQUEUE and 2 
PIPEs are created.

This is okay, but when the thread is cleared from the thread pool and a new 
thread is created, then new KQUEUEs and PIPEs are created. The problem is that 
the old ones which were created are not getting destroyed and they are showing 
up as open files. This is causing a major problem as the number of open file 
keep increasing and does not decrease.

Please suggest any solutions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-1745) Each new thread creates a PIPE and KQUEUE as open files during producer.send() and does not get cleared when the thread that creates them is cleared.

2014-11-03 Thread Vishal (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vishal updated KAFKA-1745:
--
Summary: Each new thread creates a PIPE and KQUEUE as open files during 
producer.send() and does not get cleared when the thread that creates them is 
cleared.  (was: Each new thread creates a PIPE and KQUEUE as open files during 
producer.send() and does no0t get cleared when the thread that creates them is 
cleared.)

 Each new thread creates a PIPE and KQUEUE as open files during 
 producer.send() and does not get cleared when the thread that creates them is 
 cleared.
 -

 Key: KAFKA-1745
 URL: https://issues.apache.org/jira/browse/KAFKA-1745
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.1.1
 Environment: Mac OS Mavericks
Reporter: Vishal
Priority: Critical

 Hi,
 I'm using the java client API for Kafka. I wanted to send data to Kafka 
 by using a producer pool as I'm using a sync producer. The thread that sends 
 the data is from the thread pool that grows and shrinks depending on the 
 usage. So, when I try to send data from one thread, 1 KQUEUE and 2 PIPES are 
 created (got this info by using lsof). If I keep using the same thread it's 
 fine but when a new thread sends data to Kafka (using producer.send() ) a new 
 KQUEUE and 2 PIPEs are created.
 This is okay, but when the thread is cleared from the thread pool and a new 
 thread is created, then new KQUEUEs and PIPEs are created. The problem is 
 that the old ones which were created are not getting destroyed and they are 
 showing up as open files. This is causing a major problem as the number of 
 open file keep increasing and does not decrease.
 Please suggest any solutions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)