I am using 0.8.1. The source is here:
https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
Here is the definition of disconnect():
private def disconnect() = {
if(blockingChannel.isConnected) {
debug(Disconnecting from + host + : + port)
Here is the relevant stack trace:
java.nio.channels.UnresolvedAddressException: null
at sun.nio.ch.Net.checkAddress(Net.java:127) ~[na:1.7.0_55]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:644)
~[na:1.7.0_55]
at
Ewen, you are right, the patch is committed on Feb.20th last year, I will
leave a comment and close that ticket.
On Tue, Jan 27, 2015 at 7:24 PM, Ewen Cheslack-Postava e...@confluent.io
wrote:
This was fixed in commit 6ab9b1ecd8 for KAFKA-1235 and it looks like that
will only be included in
Rajiv,
Which version of Kafka are you using? I just checked SimpleConsumer's code,
and in its close() function, disconnect() is called, which will close the
socket.
Guozhang
On Mon, Jan 26, 2015 at 2:36 PM, Rajiv Kurian ra...@signalfuse.com wrote:
Meant to write a run loop.
void run() {
This was fixed in commit 6ab9b1ecd8 for KAFKA-1235 and it looks like that
will only be included in 0.8.2.
Guozhang, it looks like you wrote the patch, Jun reviewed it, but the bug
is still open and there's a comment that moved it to 0.9 after the commit
was already made. Was the commit a mistake
Here is my typical flow:
void run() {
if (simpleConsumer == null) {
simpleConsumer = new SimpleConsumer(host, port, (int) kafkaSocketTimeout,
kafkaRExeiveBufferSize, clientName);
}
try {
// Do stuff with simpleConsumer.
} catch (Exception e) {
if (consumer != null) {
Meant to write a run loop.
void run() {
while (running) {
if (simpleConsumer == null) {
simpleConsumer = new SimpleConsumer(host, port,
(int) kafkaSocketTimeout, kafkaRExeiveBufferSize, clientName);
}
try {
// Do stuff with simpleConsumer.
} catch (Exception e) {