Re: latency - how to reduce?

2015-01-18 Thread Andrew Ehrlich

What instance types are you using in EC2? Are the drives EBS?

On 1/5/15 11:06 PM, Shlomi Hazan wrote:

Will do. What did you have in mind? just write a big file to disk and
measure the time it took to write? maybe also read back? using specific
API's?
Apart from the local Win machine case, are you aware of any issues with
Amazon EC2 instances that may be causing that same latency in production?
Thanks,
Shlomi

On Tue, Jan 6, 2015 at 4:04 AM, Jun Rao  wrote:


Not setting "log.flush.interval.messages" is good since the default gives
the best latency. Could you do some basic I/O testing on the local FS in
your windows machine to make sure the I/O latency is ok?

Thanks,

Jun

On Thu, Jan 1, 2015 at 1:40 AM, Shlomi Hazan  wrote:


Happy new year!
I did not set "log.flush.interval.messages".
I also could not find a default value in the docs.
Could you explain about that?
Thanks,
Shlomi

On Thu, Jan 1, 2015 at 2:20 AM, Jun Rao  wrote:


What's your setting of log.flush.interval.messages on the broker?

Thanks,

Jun

On Mon, Dec 29, 2014 at 3:26 AM, Shlomi Hazan 

wrote:

Hi,
I am using 0.8.1.1, and I have hundreds of msec latency at best and

even

seconds at worst.
I have this latency both on production, (with peak load of 30K

msg/sec,

replication = 2 across 5 brokers, acks = 1),
and on the local windows machine using just one process for each of
producer, zookeeper, kafka, consumer.
Also tried batch.num.messages=1 and producer.type=sync on the local

machine

but saw no improvement.
How can I push latency down to several millis, at least when running

local?

Thanks,
Shlomi






Re: kafka brokers going down within 24 hrs

2015-01-18 Thread Tousif
Here are the logs from broker id 0 and 1  and it was captured when broker 1
went down.

http://paste.ubuntu.com/9782553/
http://paste.ubuntu.com/9782554/


i'm using netty in storm and here are the configs
storm.messaging.transport: "backtype.storm.messaging.netty.Context"

 storm.messaging.netty.buffer_size: 209715200
 storm.messaging.netty.max_retries: 10
 storm.messaging.netty.max_wait_ms: 5000
 storm.messaging.netty.min_wait_ms: 1






On Sat, Jan 17, 2015 at 1:24 AM, Harsha  wrote:

> Tousif,
> I meant to say if kafka broker is going down often its better to
> analyze whats the root of cause of the crash.  Using supervisord
> to monitor kafka broker is fine, sorry about the confusion.
> -Harsha
> On Fri, Jan 16, 2015, at 11:25 AM, Gwen Shapira wrote:
> > Those errors are expected - if broker 10.0.0.11 went down, it will
> > reset the connection and the other broker will close the socket.
> > However, it looks like 10.0.0.11 crashes every two minutes?
> >
> > Do you have the logs from 10.0.0.11?
> >
> > On Thu, Jan 15, 2015 at 9:51 PM, Tousif  wrote:
> > > i'm using kafka 2.9.2-0.8.1.1 and zookeeper 3.4.6.
> > > i noticed that only one broker is going down.
> > >  My message size is less thn 3 kb and  KAFKA_HEAP_OPTS="-Xmx512M"
> > > and  KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseCompressedOops
> > > -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled
> > > -XX:+CMSScavengeBeforeRemark -XX:+DisableExplicitGC
> > > -Djava.awt.headless=true" .
> > >
> > >  Do you mean kafka broker never goes down and  does broker start
> > > automatically after failing ?
> > > I see only these errors on both the brokers.
> > >
> > > 10.0.0.11 is the broker which is going down.
> > >
> > > ERROR Closing socket for /10.0.0.11 because of error
> > > (kafka.network.Processor)
> > > java.io.IOException: Connection reset by peer
> > > at sun.nio.ch.FileDispatcher.read0(Native Method)
> > > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
> > > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
> > > at sun.nio.ch.IOUtil.read(IOUtil.java:171)
> > > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:245)
> > > at kafka.utils.Utils$.read(Utils.scala:375)
> > > at
> > >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > > at kafka.network.Processor.read(SocketServer.scala:347)
> > > at kafka.network.Processor.run(SocketServer.scala:245)
> > > at java.lang.Thread.run(Thread.java:662)
> > > [2015-01-16 11:01:48,173] INFO Closing socket connection to /10.0.0.11
> .
> > > (kafka.network.Processor)
> > > [2015-01-16 11:03:08,164] ERROR Closing socket for /10.0.0.11 because
> of
> > > error (kafka.network.Processor)
> > > java.io.IOException: Connection reset by peer
> > > at sun.nio.ch.FileDispatcher.read0(Native Method)
> > > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
> > > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
> > > at sun.nio.ch.IOUtil.read(IOUtil.java:171)
> > > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:245)
> > > at kafka.utils.Utils$.read(Utils.scala:375)
> > > at
> > >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > > at kafka.network.Processor.read(SocketServer.scala:347)
> > > at kafka.network.Processor.run(SocketServer.scala:245)
> > > at java.lang.Thread.run(Thread.java:662)
> > > [2015-01-16 11:03:08,280] INFO Closing socket connection to /10.0.0.11
> .
> > > (kafka.network.Processor)
> > > [2015-01-16 11:03:48,369] ERROR Closing socket for /10.0.0.11 because
> of
> > > error (kafka.network.Processor)
> > > java.io.IOException: Connection reset by peer
> > > at sun.nio.ch.FileDispatcher.read0(Native Method)
> > > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
> > > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
> > > at sun.nio.ch.IOUtil.read(IOUtil.java:171)
> > > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:245)
> > > at kafka.utils.Utils$.read(Utils.scala:375)
> > > at
> > >
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> > > at kafka.network.Processor.read(SocketServer.scala:347)
> > > at kafka.network.Processor.run(SocketServer.scala:245)
> > > at java.lang.Thread.run(Thread.java:662)
> > >
> > >
> > >
> > > On Thu, Jan 15, 2015 at 7:49 PM, Harsha  wrote:
> > >
> > >> Tousif,
> > >>Which version of kafka and zookeeper are you using and whats
> your
> > >>message size and jvm size that you allocated for kafka brokers.
> > >> There is only 1 zookeeper node , if its a production cluster I
> recommend
> > >> you to have quorum of zookeeper nodes. Both kafka & storm are heavy
> > >> users of zookeeper. Also supervisord is recommended for storm I am not
> > >> sure you need to have it for kafka, for storm its the fail-fast nature
> > >> of workers that requires supervisord to restart.
> > >> When kafka goes down firs

Re: [VOTE] 0.8.2.0 Candidate 1

2015-01-18 Thread Joe Stein
It works ok in gradle but fails if your using maven.

taking a look at the patch you uploaded now.

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop 
/

On Sun, Jan 18, 2015 at 8:59 PM, Jun Rao  wrote:

> There seems to be an issue with the pom file for kafka_2.11-0.8.2 jar. It
> references scala-library 2.11, which doesn't exist in maven central
> (2.11.1, etc do exist). This seems to be an issue in the 0.8.2 beta as
> well. I tried to reference kafka_2.11-0.8.2 beta in a project and the build
> failed because scala-library:jar:2.11 doesn't exist. Filed KAFKA-1876 as an
> 0.8.2 blocker. It would be great if people familiar with scala can take a
> look and see if this is a real issue.
>
> Thanks,
>
> Jun
>
> On Tue, Jan 13, 2015 at 7:16 PM, Jun Rao  wrote:
>
> > This is the first candidate for release of Apache Kafka 0.8.2.0. There
> > has been some changes since the 0.8.2 beta release, especially in the new
> > java producer api and jmx mbean names. It would be great if people can
> test
> > this out thoroughly. We are giving people 10 days for testing and voting.
> >
> > Release Notes for the 0.8.2.0 release
> > *
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/RELEASE_NOTES.html
> > <
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/RELEASE_NOTES.html
> >*
> >
> > *** Please download, test and vote by Friday, Jan 23h, 7pm PT
> >
> > Kafka's KEYS file containing PGP keys we use to sign the release:
> > *https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/KEYS
> > * in
> > addition to the md5, sha1
> > and sha2 (SHA256) checksum.
> >
> > * Release artifacts to be voted upon (source and binary):
> > *https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/
> > *
> >
> > * Maven artifacts to be voted upon prior to release:
> > *
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/maven_staging/
> > <
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/maven_staging/
> >*
> >
> > * scala-doc
> > *
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/scaladoc/#package
> > <
> https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/scaladoc/#package
> >*
> >
> > * java-doc
> > *https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/javadoc/
> > *
> >
> > * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag
> > *
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=b0c7d579f8aeb5750573008040a42b7377a651d5
> > <
> https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=b0c7d579f8aeb5750573008040a42b7377a651d5
> >*
> >
> > /***
> >
> > Thanks,
> >
> > Jun
> >
>


Re: [VOTE] 0.8.2.0 Candidate 1

2015-01-18 Thread Jun Rao
There seems to be an issue with the pom file for kafka_2.11-0.8.2 jar. It
references scala-library 2.11, which doesn't exist in maven central
(2.11.1, etc do exist). This seems to be an issue in the 0.8.2 beta as
well. I tried to reference kafka_2.11-0.8.2 beta in a project and the build
failed because scala-library:jar:2.11 doesn't exist. Filed KAFKA-1876 as an
0.8.2 blocker. It would be great if people familiar with scala can take a
look and see if this is a real issue.

Thanks,

Jun

On Tue, Jan 13, 2015 at 7:16 PM, Jun Rao  wrote:

> This is the first candidate for release of Apache Kafka 0.8.2.0. There
> has been some changes since the 0.8.2 beta release, especially in the new
> java producer api and jmx mbean names. It would be great if people can test
> this out thoroughly. We are giving people 10 days for testing and voting.
>
> Release Notes for the 0.8.2.0 release
> *https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/RELEASE_NOTES.html
> *
>
> *** Please download, test and vote by Friday, Jan 23h, 7pm PT
>
> Kafka's KEYS file containing PGP keys we use to sign the release:
> *https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/KEYS
> * in
> addition to the md5, sha1
> and sha2 (SHA256) checksum.
>
> * Release artifacts to be voted upon (source and binary):
> *https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/
> *
>
> * Maven artifacts to be voted upon prior to release:
> *https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/maven_staging/
> *
>
> * scala-doc
> *https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/scaladoc/#package
> *
>
> * java-doc
> *https://people.apache.org/~junrao/kafka-0.8.2.0-candidate1/javadoc/
> *
>
> * The tag to be voted upon (off the 0.8.2 branch) is the 0.8.2.0 tag
> *https://git-wip-us.apache.org/repos/asf?p=kafka.git;a=tag;h=b0c7d579f8aeb5750573008040a42b7377a651d5
> *
>
> /***
>
> Thanks,
>
> Jun
>


can't iterate consumed messages when checking errorCode first

2015-01-18 Thread Manu Zhang
Hi all,

I'm using Kafka low level consumer api and find in the below codes
"iterator.hasNext" always return false. Through debugging, I'm sure the
messageSet has the size of "fetchSize"

* val consumer = new SimpleConsumer(broker.host, broker.port,
soTimeout, soBufferSize, clientId)*
*  val request = new FetchRequestBuilder()*
*.addFetch(topic, partition, offset, fetchSize)*
*.build()*
*  val response = consumer.fetch(request)*
*  response.errorCode(topic, partition) match {*
*case NoError => {*
*  iterator = response.messageSet(topic, partition).iterator*
*}*
*case error => throw exceptionFor(error)*
* }*

The weird thing is that the iterator works fine when I get iterator
directly without checking the error code.

* val consumer = new SimpleConsumer(broker.host, broker.port,
soTimeout, soBufferSize, clientId)*
*  val request = new FetchRequestBuilder()*
*.addFetch(topic, partition, offset, fetchSize)*
*.build()*
*   consumer.fetch(request).messageSet(topic, partition).iterator*

Any thoughts ?

Thanks,
Manu Zhang


Re: Query regarding Kafka publishing

2015-01-18 Thread Jun Rao
Serialization does happen before partitioning. There is no particular
reason for this in the old producer since serialization and partitioning
are independent. In the new producer, partitioning is based on hash of the
serialized key. So partitioning has to be done after the serialization.

Thanks,

Jun

On Fri, Jan 16, 2015 at 10:57 AM, Liju John  wrote:

> Hi ,
>
> I have a general query -
>
> As per the code in Kafka producer  the serialization happens before
> partitioning , Is my understanding correct ? If yes whats the reason for it
> ?
>
> Regards,
> Liju John
>


Re: latency - how to reduce?

2015-01-18 Thread Jun Rao
You probably want to test at least the sequential write latency (with
flush) on your file system.

Thanks,

Jun

On Mon, Jan 12, 2015 at 6:08 AM, Shlomi Hazan  wrote:

> Thank you for the guidance, yet I want to know I am doing the test Jun had
> in mind
>
> On Thu, Jan 8, 2015 at 5:36 PM, Jayesh Thakrar  >
> wrote:
>
> > I do see the Windows based scripts in the tar file - but haven't them
> > though.You should find them under bin/windows.
> > Also you can always use other Windows stress testing tools/suites to
> check
> > your local I/O performance..
> >   From: Shlomi Hazan 
> >  To: users@kafka.apache.org; Jayesh Thakrar 
> >  Sent: Thursday, January 8, 2015 6:25 AM
> >  Subject: Re: latency - how to reduce?
> >
> > I would like to test locally first as it is easier than setting up a test
> > cluster to model the production, yet the script kafka-producer-perf-test
> is
> > not available for windows.
> > Jun, what kind of "basic I/O testing on the local FS" did you have in
> mind?
> >
> > Thanks,
> > Shlomi
> >
> >
> >
> >
> > On Tue, Jan 6, 2015 at 5:40 PM, Jayesh Thakrar
>  > >
> > wrote:
> >
> > > Have you tried using the built-in stress test scripts?
> > > bin/kafka-producer-perf-test.sh
> > > bin/kafka-consumer-perf-test.sh
> > >
> > > Here's how I stress tested them -
> > > nohup ${KAFKA_HOME}/bin/kafka-producer-perf-test.sh --broker-list
> > > ${KAFKA_SERVERS} --topic ${TOPIC_NAME} --new-producer --threads 16
> > > --messages 1 1>kafka-producer-perf-test.sh.log 2>&1  &
> > >
> > > nohup ${KAFKA_HOME}/bin/kafka-consumer-perf-test.sh --zookeeper
> > > ${ZOOKEEPER_QUORUM} --topic ${TOPIC_NAME} --threads 16
> > > 1>kafka-consumer-perf-test.sh.log 2>&1  &
> > >
> > > And I used screen scrapping of the jmx ui screens to push metrics into
> > > TSDB to get the following.The rate below is per second - so I could
> push
> > > the Kafka cluster to 140k+ messages/sec on a 4-node cluster with very
> > > little utilization (<30% utilization).
> > >
> > >
> > >  From: Shlomi Hazan 
> > >  To: users@kafka.apache.org
> > >  Sent: Tuesday, January 6, 2015 1:06 AM
> > >  Subject: Re: latency - how to reduce?
> > >
> > > Will do. What did you have in mind? just write a big file to disk and
> > > measure the time it took to write? maybe also read back? using specific
> > > API's?
> > > Apart from the local Win machine case, are you aware of any issues with
> > > Amazon EC2 instances that may be causing that same latency in
> production?
> > > Thanks,
> > > Shlomi
> > >
> > >
> > >
> > > On Tue, Jan 6, 2015 at 4:04 AM, Jun Rao  wrote:
> > >
> > > > Not setting "log.flush.interval.messages" is good since the default
> > gives
> > > > the best latency. Could you do some basic I/O testing on the local FS
> > in
> > > > your windows machine to make sure the I/O latency is ok?
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Thu, Jan 1, 2015 at 1:40 AM, Shlomi Hazan 
> wrote:
> > > >
> > > > > Happy new year!
> > > > > I did not set "log.flush.interval.messages".
> > > > > I also could not find a default value in the docs.
> > > > > Could you explain about that?
> > > > > Thanks,
> > > > > Shlomi
> > > > >
> > > > > On Thu, Jan 1, 2015 at 2:20 AM, Jun Rao  wrote:
> > > > >
> > > > > > What's your setting of log.flush.interval.messages on the broker?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Mon, Dec 29, 2014 at 3:26 AM, Shlomi Hazan 
> > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > > I am using 0.8.1.1, and I have hundreds of msec latency at best
> > and
> > > > > even
> > > > > > > seconds at worst.
> > > > > > > I have this latency both on production, (with peak load of 30K
> > > > msg/sec,
> > > > > > > replication = 2 across 5 brokers, acks = 1),
> > > > > > > and on the local windows machine using just one process for
> each
> > of
> > > > > > > producer, zookeeper, kafka, consumer.
> > > > > > > Also tried batch.num.messages=1 and producer.type=sync on the
> > local
> > > > > > machine
> > > > > > > but saw no improvement.
> > > > > > > How can I push latency down to several millis, at least when
> > > running
> > > > > > local?
> > > > > > > Thanks,
> > > > > > > Shlomi
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > >
> >
> >
> >
> >
>


Re: connection error among nodes

2015-01-18 Thread Jun Rao
Any issue with the network?

Thanks,

Jun

On Wed, Jan 7, 2015 at 1:59 PM, Sa Li  wrote:

> Things bother me, sometimes, the errors won't pop out, sometimes it comes,
> why?
>
> On Wed, Jan 7, 2015 at 1:49 PM, Sa Li  wrote:
>
> >
> > Hi, Experts
> >
> > Our cluster is a 3 nodes cluster, I simply test producer locally, see
> >
> > bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance
> > test-rep-three 100 3000 -1 acks=1 bootstrap.servers=
> 10.100.98.100:9092
> > buffer.memory=67108864 batch.size=8196
> >
> > But I got such error, I do think this is critical issue, it just
> > temporally lose the connection and get back, what is the reason for this?
> >
> > [2015-01-07 21:44:14,180] WARN Error in I/O with voluminous-mass.master/
> > 10.100.98.101 (org.apache.kafka.common.network.Selector)
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> > at
> > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
> > at
> org.apache.kafka.common.network.Selector.poll(Selector.java:232)
> > at
> > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:191)
> > at
> > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184)
> > at
> > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> > at java.lang.Thread.run(Thread.java:745)
> > [2015-01-07 21:44:14,190] WARN Error in I/O with voluminous-mass.master/
> > 10.100.98.101 (org.apache.kafka.common.network.Selector)
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> > at
> > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
> > at
> org.apache.kafka.common.network.Selector.poll(Selector.java:232)
> > at
> > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:191)
> > at
> > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184)
> > at
> > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> > at java.lang.Thread.run(Thread.java:745)
> > [2015-01-07 21:44:14,200] WARN Error in I/O with voluminous-mass.master/
> > 10.100.98.101 (org.apache.kafka.common.network.Selector)
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> > at
> > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
> > at
> org.apache.kafka.common.network.Selector.poll(Selector.java:232)
> > at
> > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:191)
> > at
> > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184)
> > at
> > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> > at java.lang.Thread.run(Thread.java:745)
> > [2015-01-07 21:44:14,210] WARN Error in I/O with voluminous-mass.master/
> > 10.100.98.101 (org.apache.kafka.common.network.Selector)
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> > at
> > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
> > at
> org.apache.kafka.common.network.Selector.poll(Selector.java:232)
> > at
> > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:191)
> > at
> > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184)
> > at
> > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> > at java.lang.Thread.run(Thread.java:745)
> > [2015-01-07 21:44:14,220] WARN Error in I/O with voluminous-mass.master/
> > 10.100.98.101 (org.apache.kafka.common.network.Selector)
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> > at
> > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
> > at
> org.apache.kafka.common.network.Selector.poll(Selector.java:232)
> > at
> > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:191)
> > at
> > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:184)
> > at
> > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115)
> > at java.lang.Thread.run(Thread.java:745)
> > [2015-01-07 21:44:14,230] WARN Error in I/O with voluminous-mass.master/
> > 10.100.98.101 (org.apache.kafka.common.network.Selector)
> > java.net.ConnectException: Connection refused
> > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> > at
> > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
> > at
> org.apache.kafka.common.network.Selector.poll(Selector.java:232)
> > at
> > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:191)
> > at
> > org.a

Re: config for consumer and producer

2015-01-18 Thread Jun Rao
For the consumer group setting, you probably want to read up the doc on
this a bit (http://kafka.apache.org/documentation.html#introduction). For
the producer, you need to know what config options the C# client provides.
Our documentation is only for the java producer.

Thanks,

Jun

On Tue, Jan 6, 2015 at 2:27 PM, Sa Li  wrote:

> Hi, All
>
> I am testing and making changes on server.properties, I wonder do I need to
> specifically change the values in consumer and producer properties, here is
> the consumer.properties
>
> zookeeper.connect=10.100.98.100:2181,10.100.98.101:2181,10.100.98.102:2181
> # timeout in ms for connecting to zookeeper
> zookeeper.connection.timeout.ms=100
> #consumer group id
> group.id=test-consumer-group
> #consumer timeout
> #consumer.timeout.ms=5000
>
> I use defaults for most of parameters, for group.id, it was defined as "A
> string that uniquely identifies the group of consumer processes to which
> this consumer belongs. By setting the same group id multiple processes
> indicate that they are all part of the same consumer group."
>
> Do I need to define many consumer-group here?
> For producer, we are not user java client, it is a C# client sending
> message to kafka, so this producer won't be matter (except I am doing
> producer test locally), right?
>
> producer.type=sync
> compression.codec=none
>
> Thanks
> --
>
> Alec Li
>


Re: Topic in bad state after controller to broker messaging fails

2015-01-18 Thread Jun Rao
Yes, typically, a bad topic state is the result of communication issues
between the controller and the broker. You may be hitting KAFKA-1738. Could
you try 0.8.2.0 RC1 or the latest 0.8.2 branch? The issue has been fixed
there.

Thanks,

Jun

On Tue, Jan 6, 2015 at 2:17 PM, Henri Pihkala 
wrote:

> Hi,
>
> I’m hitting a strange problem using 0.8.2-beta and just a single kafka
> broker on CentOS 6.5.
>
> A percentage of my topic create attempts are randomly failing and leaving
> the new topic in a state in which it can not be used due to “partition
> doesn’t exist” errors as seen in server.log below.
>
> In controller.log, it looks like the controller fails to send either the
> "become-leader LeaderAndIsr request” or the "UpdateMetadata request” to the
> broker (which in fact is the same Kafka instance), due to a socket read
> failing (for unknown reason).
>
> My questions:
>
> (1) Is the bad topic state a result of the message not making it from the
> controller to the broker?
>
> (2) Any idea why the socket read might randomly fail? It can’t be a
> network issue since we’re running a single instance.
>
> (3) Shouldn’t the controller try to resend the message?
>
>
>
> controller.log
>
> [2015-01-06 21:31:10,304] INFO [Controller 0]: New topic creation callback
> for [09b1ebac-7036-49fc-aa61-7852808ca241,0]
> (kafka.controller.KafkaController)
>
> [2015-01-06 21:31:10,304] INFO [Controller 0]: New partition creation
> callback for [09b1ebac-7036-49fc-aa61-7852808ca241,0]
> (kafka.controller.KafkaController)
>
> [2015-01-06 21:31:10,304] INFO [Partition state machine on Controller 0]:
> Invoking state change to NewPartition for partitions
> [09b1ebac-7036-49fc-aa61-7852808ca241,0]
> (kafka.controller.PartitionStateMachine)
>
> [2015-01-06 21:31:10,308] INFO [Replica state machine on controller 0]:
> Invoking state change to NewReplica for replicas
> [Topic=09b1ebac-7036-49fc-aa61-7852808ca241,Partition=0,Replica=0]
> (kafka.controller.ReplicaStateMachine)
>
> [2015-01-06 21:31:10,308] INFO [Partition state machine on Controller 0]:
> Invoking state change to OnlinePartition for partitions
> [09b1ebac-7036-49fc-aa61-7852808ca241,0]
> (kafka.controller.PartitionStateMachine)
>
> [2015-01-06 21:31:10,308] DEBUG [Partition state machine on Controller 0]:
> Live assigned replicas for partition
> [09b1ebac-7036-49fc-aa61-7852808ca241,0] are: [List(0)]
> (kafka.controller.PartitionStateMachine)
>
> [2015-01-06 21:31:10,309] DEBUG [Partition state machine on Controller 0]:
> Initializing leader and isr for partition
> [09b1ebac-7036-49fc-aa61-7852808ca241,0] to
> (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2)
> (kafka.controller.PartitionStateMachine)
>
> [2015-01-06 21:31:10,501] INFO [Replica state machine on controller 0]:
> Invoking state change to OnlineReplica for replicas
> [Topic=09b1ebac-7036-49fc-aa61-7852808ca241,Partition=0,Replica=0]
> (kafka.controller.ReplicaStateMachine)
>
> [2015-01-06 21:31:10,502] WARN [Controller-0-to-broker-0-send-thread],
> Controller 0 fails to send a request to broker
> id:0,host:dev.unifina,port:9092 (kafka.controller.RequestSendThread)
> java.io.EOFException: Received -1 when reading from channel, socket has
> likely been closed.
> at kafka.utils.Utils$.read(Utils.scala:381)
> at
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at
> kafka.network.Receive$class.readCompletely(Transmission.scala:56)
> at
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
> at kafka.network.BlockingChannel.receive(BlockingChannel.scala:108)
> at
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:146)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>
> [2015-01-06 21:31:10,505] ERROR [Controller-0-to-broker-0-send-thread],
> Controller 0 epoch 2 failed to send request
> Name:UpdateMetadataRequest;Version:0;Controller:0;ControllerEpoch:2;CorrelationId:16;ClientId:id_0-host_dev.unifina-port_9092;AliveBrokers:id:0,host:dev.unifina,port:9092;PartitionState:[09b1ebac-7036-49fc-aa61-7852808ca241,0]
> ->
> (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:2),ReplicationFactor:1),AllReplicas:0)
> to broker id:0,host:dev.unifina,port:9092. Reconnecting to broker.
> (kafka.controller.RequestSendThread)
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:97)
> at
> kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:132)
> at
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:131)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
>
>
>
>
> state-change.log
>
> [2015-01-06 21:31:10,306] TRACE Controller 0 epoch 2 changed partition
> [09b1ebac-7036-49fc-aa61-7852808ca241,0] state from NonExistentPartition to
> NewPartition with assigned replicas 0 (state.change.l

Re: Offset management in multi-threaded high-level consumer

2015-01-18 Thread Jun Rao
There isn't much difference btw option 1 and 2 in terms of the offset
commit overhead to Zookeeper. In 0.8.2, we will have a Kafka-based offset
management, which is much more efficient than committing to Zookeeper.

Thanks,

Jun

On Tue, Jan 6, 2015 at 10:45 AM, Rafi Shamim  wrote:

> Hello,
>
> I would like to write a multi-threaded consumer for the high-level
> consumer in Kafka 0.8.1. I have found two ways that seem feasible
> while keeping the guarantee that messages in a partition are processed
> in order. I would appreciate any feedback this list has.
>
> Option 1
> 
> - Create multiple threads, so each thread has its own ConsumerConnector.
> - Manually commit offsets in each thread after every N messages.
> - This was discussed a bit on this list previously. See [1].
>
> ### Questions
> - Is there a problem with making multiple ConsumerConnectors per machine?
> - What does it take for ZooKeeper to handle this much load? We have a
> 3-node ZooKeeper cluster with relatively small machines. (I expect the
> topic will have about 40 messages per second. There will be 3 consumer
> groups. That would be 120 commits per second at most, but I can reduce
> the frequency of commits to make this lower.)
>
> ### Extra info
> Kafka 0.9 will have an entirely different commit API, which will allow
> one connection to commit offsets per partition, but I can’t wait that
> long. See [2].
>
>
> Option 2
> 
> - Create one ConsumerConnector, but ask for multiple streams in that
> connection. Give each thread one stream.
> - Since there is no way to commit offsets per stream right now, we
> need to do autoCommit.
> - This sacrifices the at-least-once processing guarantee, which would
> be nice to have. See KAFKA-1612 [3].
>
> ### Extra info
> - There was some discussion in KAFKA-996 about a markForCommit()
> method so that autoCommit would preserve the at-least-once guarantee,
> but it seems more likely that the consumer API will just be redesigned
> to allow commits per partition instead. See [4].
>
>
> So basically I'm wondering if option 1 is feasible. If not, I'll just
> do option 2. Of course, let me know if I was mistaken about anything
> or if there is another design which is better.
>
> Thanks in advance.
> Rafi
>
> [1]
> http://mail-archives.apache.org/mod_mbox/kafka-users/201310.mbox/%3cff142f6b499ae34caed4d263f6ca32901d35a...@extxmb19.nam.nsroot.net%3E
> [2]
> https://cwiki.apache.org/confluence/display/KAFKA/Client+Rewrite#ClientRewrite-ConsumerAPI
> [3] https://issues.apache.org/jira/browse/KAFKA-1612
> [4] https://issues.apache.org/jira/browse/KAFKA-966
>