Re: Kafka Zookeeper Connection

2018-02-25 Thread Kamal C
You also have to update the property `zookeeper.session.timeout.ms

On Sat, Feb 24, 2018 at 11:25 PM, Ted Yu  wrote:

> Please take a look at maxSessionTimeout under:
> http://zookeeper.apache.org/doc/r3.3.3/zookeeperAdmin.
> html#sc_advancedConfiguration
>
> On Sat, Feb 24, 2018 at 9:46 AM, Soheil Pourbafrani  >
> wrote:
>
> > Thanks, Manna, Can you say which property (in Kafka or Zookeeper) should
> I
> > increase?
> >
> > On Sat, Feb 24, 2018 at 7:49 PM, M. Manna  wrote:
> >
> > > Have you tried increasing the timeouts for zookeeper nodes and Kafka
> > > brokers to see if they make a difference?
> > >
> > >
> > >
> > > On Sat, 24 Feb 2018 at 14:55, Soheil Pourbafrani <
> soheil.i...@gmail.com>
> > > wrote:
> > >
> > > > Hi and Thanks,
> > > > Excuse me, The Kafka version is 0.11 and Zookeeper version is 3.4.10
> > > >
> > > > I've check Zookeeper logs and sessions are expiring and renewing
> > > > continuously there. I use the same Zookeeper cluster for Hadoop HA
> and
> > it
> > > > works well.
> > > >
> > > > I Answer the same question with more details in the following link:
> > > >
> > > > Here
> > > > <
> > > > https://stackoverflow.com/questions/48885469/kafka-
> > > zookeeper-connection-drop-continuously
> > > > >
> > > >
> > >
> >
>


Re: Log flush

2017-08-15 Thread Kamal C
If you've noticed the default values of the above configuration, it's
Long.MAX_VALUE.

This is set to discourage the users not to edit / re-configure it. The
above configuration
is to flush the messages from the cache to the disk (fsync). Kafka
delegates the task of
flushing the messages to disk to the OS Kernel. This is done to maintain
faster read &
write performance.

Kafka doesn't rely on `fsync` for message durability. Instead, it relies on
it's own replicated
change-logs located in other brokers.

-- Kamal

On Wed, Aug 16, 2017 at 12:45 AM, Jakes John 
wrote:

> During my Kafka installation,  I got some questions with some of the
> parameter configurations
>
>  I see that log.flush.interval.messages and log.flush.interval.ms are
> commented out in the default kafka server properties file. I read two
> conflicting statements about these parameters. In one place, I read that it
> is recommended not to configure these parameters. While in the other, it
> says that flush time should not be huge else it will affect the
> performance.   What is the best configuration and recommended way? When do
> I need to configure these parameters and what is the default behaviour?
>


Re: Kafka Producer Errors

2017-08-14 Thread Kamal C
I think your application (where the producer resides) is facing GC issues.
The time taken for the GC might be higher than the `request.timeout.ms`.

Check your `jvm.log` and update the `request.timeout.ms`. The same property
is applicable to producer, consumer and broker. Increase the config only
for the KafkaProducer.

-- Kamal

On Sat, Aug 12, 2017 at 12:19 AM, Saladi Naidu <
naidusp2...@yahoo.com.invalid> wrote:

> Looks like some of the fonts/characters did not come through, am
> re-sending with correct details
> We have recently upgraded our Kafka cluster from 0.8.2 to 0.10.1. As part
> of the upgrade our clients upgraded the client libraries as well. After the
> upgrade we are facing following issue
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for
> OM-LOCAL-DC3-REQUISITION-SUBMISSION-PRD-E2-14: 30027 ms has passed since
> last append
>
>
> My understanding is that Producer batches the send requests going to
> topic/partition per leader and sends it once based on following parameters
> linger.ms is time based batching
> batch.size  is size based batching
>
> We had batch.size as 16K (default) and linger.ms 0 - We expected producer
> to send requests right away even if the batch size is not met because
> linger.ms is 0.
>
> Our request.timeout.ms 30 seconds, looking at the error, it looks like
> requests are kept in the sender queue longer and when the Sender polling
> thread sees the requests are older, it is expiring them out.
> I have tried following multiple combinations and none seem to work
>
> linger.ms 0 and batch.size - < 16 klinger.ms 0 and batch.size - 0 k
> linger.ms 1000 and batch.size - < 0
>
> Any clue on why linger.ms 0 or btach.size 0 is not taken into
> consideration ???  Naidu Saladi
>
>
> On Thursday, August 10, 2017 1:57 PM, Saladi Naidu
>  wrote:
>
>
>  We have recently upgraded our Kafka cluster from 0.8.2 to 0.10.1. As part
> of the upgrade our clients upgraded the client libraries as well. After the
> upgrade we are facing following issue
> org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for
> OM-LOCAL-DC3-REQUISITION-SUBMISSION-PRD-E2-14: 30027 ms has passed since
> last append
>
>
> My understanding is that Producer batches the send requests going to
> topic/partition per leader and sends it once based on following parameters
> ülinger.ms  ß
>
> is time based batchingübatch.size ß
>
> is size based batching
> We had batch.size as 16K and linger.ms 0 and we expected producer to send
> requests right away even if the batch size is not met because linger.ms
> is zero.
> Our request.timeout.ms 30 seconds, looking at the error, it looks like
> requests are kept in the sender queue longer and when the Sender polling
> thread sees the requests are older, it is expiring them out. I have tried
> following multiple combinations and none seem to work
>
> linger.ms 0 and batch.size - < 16 klinger.ms 0 and batch.size - 0 k
> linger.ms 1000 and batch.size - < 0
>
> Any clue on why linger.ms 0 or btach.size 0 is not taken into
> consideration ???
> Thanks,Naidu Saladi
>
>
>


Re: Kafka shutdown gracefully

2017-07-06 Thread Kamal C
Don't use `kill -9 PID`. Use `kill -s TERM PID` - sends a signal to the
process to end, and will trigger any cleanup routines before exiting.

Since the output of the `ps` command used by kafka-server-stop.sh exceeds
4096 characters. It shows "No kafka server to stop"

On Thu, Jul 6, 2017 at 3:25 AM, Ghosh, Achintya (Contractor) <
achintya_gh...@comcast.com> wrote:

> Hi team,
>
> What is the command to shutdown kafka server gracefully instead of using
> 'kill -9 PID'?
>
> If we use bin/kafka-server-stop.sh it shows "No kafka server to stop" but
> the service actually running and I see the PID by using "ps -ef|grep kafka"
>
>
> Thanks
> Achintya
>


Re: Kafka Producer - Multiple broker - Data sent to buffer but not in Queue

2017-04-19 Thread Kamal C
> bootstrap.servers =  , 

Is your bootstrap.servers configuration is correct ? You have specified
port `9091`, but running the GetOffsetShell command on `9094`





On Wed, Apr 19, 2017 at 11:58 AM, Ranjith Anbazhakan <
ranjith.anbazha...@aspiresys.com> wrote:

> Unfortunately, there is no specific information (error/exception) in the
> logs when the buffer to queue records data goes missing i.e) When stopped
> broker (say broker 2) is started and followed by stopping current running
> broker that received all producer sent records in buffer (say broker 1).
>
> Thanks,
> Ranjith
>
> -Original Message-
> From: David Garcia [mailto:]
> Sent: Wednesday, April 19, 2017 09:31
> To: users@kafka.apache.org
> Subject: Re: Kafka Producer - Multiple broker - Data sent to buffer but
> not in Queue
>
> What do broker logs say around the time you send your messages?
>
> On 4/18/17, 3:21 AM, "Ranjith Anbazhakan"  com> wrote:
>
> Hi,
>
> I have been testing behavior of multiple broker instances of kafka in
> same machine and facing inconsistent behavior of producer sent records to
> buffer not being available in queue always.
>
> Tried kafka versions:
> 0.10.2.0
> 0.10.1.0
>
> Scenario:
>
> 1.   Ran two broker instances in same machine. Say broker 1 as
> initial leader, broker 2 as initial follower.
>
> 2.   Stopped broker 1. Now broker 2 became leader.
>
> 3.   Now producer sends records for a given topic TEST through
> send() method, followed by flush(). Records have to go to Broker 2
> logically. No error/exception is thrown by code. (So it is assumed data has
> been sent successfully to buffer)
>
> 4.   When using command to check the records count for TEST topic
> in Broker 2, the sent records are not added to existing records count for
> that topic in queue.
>
> a.   Used command - kafka-run-class.bat kafka.tools.GetOffsetShell
> --broker-list localhost:9094 --topic TEST --time -1 (where TEST is the used
> topic)
>
> NOTE: **Step 4 is not happening always and is inconsistent**. In the
> scenario when it does not work, if Broker 1 is made UP and then made DOWN,
> records are always been available in queue in Broker 2 post doing Step 3.
>
> Configurations:
> Overall Producer configurations: (most are default values)
> acks = all
> batch.size = 16384
> block.on.buffer.full = false
> bootstrap.servers =  ,
> 
> buffer.memory = 33554432
> client.id = producer-1
> compression.type = none
> connections.max.idle.ms = 54
> interceptor.classes = null
> key.serializer = class org.apache.kafka.common.
> serialization.StringSerializer
> linger.ms = 1
> max.block.ms = 6
> max.in.flight.requests.per.connection = 5
> max.request.size = 1048576
> metadata.fetch.timeout.ms = 6
> metadata.max.age.ms = 30
> metric.reporters = []
> metrics.num.samples = 2
> metrics.sample.window.ms = 3
> partitioner.class = class org.apache.kafka.clients.
> producer.internals.DefaultPartitioner
> receive.buffer.bytes = 32768
> reconnect.backoff.ms = 50
> request.timeout.ms = 3
> retries = 0
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 6
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.mechanism = GSSAPI
> security.protocol = PLAINTEXT
> send.buffer.bytes = 131072
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.endpoint.identification.algorithm = null
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLS
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.location = null
> ssl.truststore.password = null
> ssl.truststore.type = JKS
> timeout.ms = 3
> value.serializer = class 

Re: Apache Kafka integration using Apache Camel

2017-01-09 Thread Kamal C
Can you enable DEBUG logs ? It'll be helpful to debug.

-- Kamal

On Mon, Jan 9, 2017 at 5:37 AM, Gupta, Swati  wrote:

> Hello All,
>
> Any help on this would be appreciated.
> There seems to be no error. Does it look like a version issue?
>
> I have updated my pom.xml with the below:
> 
> org.springframework.kafka
> spring-kafka
> 1.1.2.BUILD-SNAPSHOT
> 
>
> 
> org.apache.camel
> camel-kafka
> 2.17.0
> 
>
> 
> org.apache.kafka
> kafka-clients
> 0.10.1.0
> 
> 
> org.apache.kafka
> kafka_2.11
> 0.10.1.0
> 
>
> 
> org.apache.camel
> camel-core
> 2.17.0
> 
>
> Thanks & Regards
> Swati
>
> -Original Message-
> From: Gupta, Swati [mailto:swati.gu...@anz.com]
> Sent: Friday, 6 January 2017 4:01 PM
> To: users@kafka.apache.org
> Subject: RE: Apache Kafka integration using Apache Camel
>
> Yes, the kafka console consumer displays the message correctly.
> I also tested the same with a Java application, it works fine. There seems
> to be an issue with Camel route trying to consume.
>
> There is no error in the console. But, the logs show as below:
> kafka.KafkaCamelTestConsumer
> Connected to the target VM, address: '127.0.0.1:65007', transport:
> 'socket'
> PID_IS_UNDEFINED: INFO  DefaultCamelContext - Apache Camel 2.17.0
> (CamelContext: camel-1) is starting
> PID_IS_UNDEFINED: INFO  ManagedManagementStrategy - JMX is enabled
> PID_IS_UNDEFINED: INFO  DefaultTypeConverter - Loaded 183 type converters
> PID_IS_UNDEFINED: INFO  DefaultRuntimeEndpointRegistry - Runtime endpoint
> registry is in extended mode gathering usage statistics of all incoming and
> outgoing endpoints (cache limit: 1000)
> PID_IS_UNDEFINED: INFO  DefaultCamelContext - AllowUseOriginalMessage is
> enabled. If access to the original message is not needed, then its
> recommended to turn this option off as it may improve performance.
> PID_IS_UNDEFINED: INFO  DefaultCamelContext - StreamCaching is not in use.
> If using streams then its recommended to enable stream caching. See more
> details at http://camel.apache.org/stream-caching.html
> PID_IS_UNDEFINED: INFO  KafkaConsumer - Starting Kafka consumer
> PID_IS_UNDEFINED: INFO  ConsumerConfig - ConsumerConfig values:
> auto.commit.interval.ms = 5000
> auto.offset.reset = earliest
> bootstrap.servers = [localhost:9092]
> check.crcs = true
> client.id =
> connections.max.idle.ms = 54
> enable.auto.commit = true
> exclude.internal.topics = true
> fetch.max.bytes = 52428800
> fetch.max.wait.ms = 500
> fetch.min.bytes = 1024
> group.id = testing
> heartbeat.interval.ms = 3000
> interceptor.classes = null
> key.deserializer = class org.apache.kafka.common.serialization.
> StringDeserializer
> max.partition.fetch.bytes = 1048576
> max.poll.interval.ms = 30
> max.poll.records = 500
> metadata.max.age.ms = 30
> metric.reporters = []
> metrics.num.samples = 2
> metrics.sample.window.ms = 3
> partition.assignment.strategy = [org.apache.kafka.clients.
> consumer.RangeAssignor]
> receive.buffer.bytes = 32768
> reconnect.backoff.ms = 50
> request.timeout.ms = 4
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.min.time.before.relogin = 6
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> sasl.kerberos.ticket.renew.window.factor = 0.8
> sasl.mechanism = GSSAPI
> security.protocol = PLAINTEXT
> send.buffer.bytes = 131072
> session.timeout.ms = 3
> ssl.cipher.suites = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.endpoint.identification.algorithm = null
> ssl.key.password = null
> ssl.keymanager.algorithm = SunX509
> ssl.keystore.location = null
> ssl.keystore.password = null
> ssl.keystore.type = JKS
> ssl.protocol = TLS
> ssl.provider = null
> ssl.secure.random.implementation = null
> ssl.trustmanager.algorithm = PKIX
> ssl.truststore.location = null
> ssl.truststore.password = null
> ssl.truststore.type = JKS
> value.deserializer = class org.apache.kafka.common.serialization.
> StringDeserializer
>
> PID_IS_UNDEFINED: INFO  ConsumerConfig - ConsumerConfig values:
> auto.commit.interval.ms = 5000
> auto.offset.reset = earliest
> bootstrap.servers = [localhost:9092]
> check.crcs = true
> client.id = consumer-1
> 

Re: programmatic way to check for topic existence?

2016-10-25 Thread Kamal C
Ben,

You can list all the available topic information and do a simple look up
from the returned list.

Map topics = consumer.listTopics();
topics.contains () - isn't enough?

-- Kamal
On 25 Oct 2016 22:56, "Ben Osheroff"  wrote:

> We won't proceed in the face of a missing table, we'll just crash.  But
> it's still a bad experience for us and the end user; we have to guess
> that maybe a TimeoutException means a missing topic, and we also have to
> wait the N seconds (default 60) for the thing to "timeout".
>
> On Tue, Oct 25, 2016 at 12:14:46AM -0700, Andy Chambers wrote:
> > You could just catch the exception but if this is per row, that is
> probably
> > prohibitively expensive.
> >
> > Doesn't the binlog get "create table" events? Wouldn't that be a better
> > time to create the topic?
> >
> > --
> > Andy
> >
> > On Mon, Oct 24, 2016 at 2:32 PM, Ben Osheroff 
> > wrote:
> >
> > > Hiya!
> > >
> > > I've been trying to merge https://protect-us.mimecast.
> com/s/ANVVBZU83nznf9,
> > > which adds a much-requested feature of Maxwell, that of being able to
> > > have a topic-per-mysql-table.  When we receive a row we
> programmatically
> > > generate the topic name, and the first thing we do is call
> > > `KafkaProducer#partitionsFor(topic)`, so that we know how to partition
> > > the data.
> > >
> > > The problem I'm running into is in trying to detect the case where a
> > > topic doesn't exist.  If auto-creation is on, `partitionsFor()` seems
> to
> > > correctly auto-create the topic, but if auto-creation is off the
> > > behavior is kinda wonky; kafka goes into a metadata-fetch loop, logging
> > >
> > > "Error while fetching metadata with correlation id 573
> > > :{topic=UNKNOWN_TOPIC_OR_PARTITION}"
> > >
> > > but then ultimately throwing me back a `TimeoutException` after 60
> tries
> > > or so.
> > >
> > > I can rescue/rethrow the TimeoutException, but it seems like there
> might
> > > be a better way that I'm missing.  Any ideas?  I'd ideally just like a
> > > way to fail fast and clean when the topic doesn't exist (and
> > > auto-creation is off).
> > >
> > > Thanks,
> > > Ben Osheroff
> > > zendesk.com
> > >
> > >
> > >
> > >
>


Re: How to keep consumers alive without polling new messages

2016-09-28 Thread Kamal C
You can refer this example[1]

[1]:
https://github.com/omkreddy/kafka-examples/blob/master/consumer/src/main/java/kafka/examples/consumer/advanced/AdvancedConsumer.java

- Kamal

On Wed, Sep 28, 2016 at 11:33 AM, Vincent Dautremont <
vincent.dautrem...@olamobile.com> wrote:

> I had the same problem :
> Call pause() on all partitions.
> Then continue your loop that calls consume(), it will then poll without
> consuming messages.
>
> When you want to consume again, call resume() on all partition
>
> It's not obvious at all, the doc should explain that in the documentation
> of method consume() and put it in red and bold.
> https://kafka.apache.org/090/javadoc/index.html?org/apache/
> kafka/clients/consumer/KafkaConsumer.html
>
> > Le 28 sept. 2016 à 06:21, Yifan Ying  a écrit :
> >
> > Hi all,
> >
> > 0.10 consumers use poll() method to heartbeat Kafka brokers. Is there any
> > way that I can make the consumer heartbeat but not poll any messages? The
> > javadoc says, the recommended way is to move message processing to
> another
> > thread. But when message processing keeps failing(because a third party
> > service goes down for a while), the thread that actually processes
> messages
> > could have too many messages accumulated. Maybe re-sending failed
> messages
> > to another queue(IMQ) and re-processing them later is a good option?
> >
> > Thanks!
> > --
> > Yifan
>


Re: producer can't push msg sometimes with 1 broker recoved

2016-09-27 Thread Kamal C
Aggie,

I'm not able to re-produce your behavior in 0.10.0.1.

> I did more testing and find the rule (Topic is created with
"--replication-factor 2 --partitions 1" in following case):
> node 1   node 2
> down(lead)   down (replica)
> down(replica) up   (lead)  producer send fail !!!

When node 2 is up, after the metadata update producer able to connect and
send messages to it.

Logs:

[2016-09-27T15:18:17,907] NetworkClient: handleDisconnections(): Node 1
disconnected.
[2016-09-27T15:18:18,007] NetworkClient: initiateConnect(): Initiating
connection to node 1 at localhost:9093.
[2016-09-27T15:18:18,008] Selector: pollSelectionKeys(): Connection with
localhost/127.0.0.1 disconnected
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
~[?:1.8.0_45]
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
~[?:1.8.0_45]
at
org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:51)
~[kafka-clients-0.10.0.1.jar:?]
at
org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:73)
~[kafka-clients-0.10.0.1.jar:?]
at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:309)
[kafka-clients-0.10.0.1.jar:?]
at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
[kafka-clients-0.10.0.1.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
[kafka-clients-0.10.0.1.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:229)
[kafka-clients-0.10.0.1.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:134)
[kafka-clients-0.10.0.1.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_45]
[2016-09-27T15:18:18,008] NetworkClient: handleDisconnections(): Node 1
disconnected.
[2016-09-27T15:18:18,043] NetworkClient: maybeUpdate(): Sending metadata
request {topics=[hello]} to node 0
[2016-09-27T15:18:18,052] Metadata: update(): Updated cluster metadata
version 4 to Cluster(nodes = [tcltest1.nmsworks.co.in:9092 (id: 0 rack:
null)], partitions = [Partition(topic = hello, partition = 0, leader =
none, replicas = [0,1,], isr = []])
[2016-09-27T15:18:19,053] NetworkClient: maybeUpdate(): Sending metadata
request {topics=[hello]} to node 0
[2016-09-27T15:18:19,056] Metadata: update(): Updated cluster metadata
version 5 to Cluster(nodes = [tcltest1.nmsworks.co.in:9092 (id: 0 rack:
null)], partitions = [Partition(topic = hello, partition = 0, leader = 0,
replicas = [0,1,], isr = [0,]])
[2016-09-27T15:18:19,081] KafkaProducer: main(): Batch : 4 sent
[2016-09-27T15:18:19,182] KafkaProducer: main(): Batch : 5, Sending the
record with key : 0

- Kamal

On Mon, Sep 26, 2016 at 8:53 AM, FEI Aggie <aggie@alcatel-lucent.com>
wrote:

> Kamal,
> Thanks for your response. I tried testing with metadata.max.age.ms
> reduced to 10s, but the behavior not changed, and producer still can't find
> the live broker.
>
> I did more testing and find the rule (Topic is created with
> "--replication-factor 2 --partitions 1" in following case):
> node 1   node 2
> down(lead)   down (replica)
> down(replica) up   (lead)  producer send fail !!!
>
>

> down(lead)   down (replica)
> up  (lead)   down (replica) producer send ok !!!
>
> If the only node with original lead partition up, everything is fine.
> If the only node with original replica partition up, producer can't
> connect to broker alive (always try to connect to the original lead broker,
> node 1 in my case).
>
> Kafka can't recover for this situation? Anyone has clue for this?
>
> Thanks!
> Aggie
> -Original Message-
> From: Kamal C [mailto:kamaltar...@gmail.com]
> Sent: Saturday, September 24, 2016 1:37 PM
> To: users@kafka.apache.org
> Subject: Re: producer can't push msg sometimes with 1 broker recoved
>
> Reduce the metadata refresh interval 'metadata.max.age.ms' from 5 min to
> your desired time interval.
> This may reduce the time window of non-availability broker.
>
> -- Kamal
>


Re: producer can't push msg sometimes with 1 broker recoved

2016-09-23 Thread Kamal C
Reduce the metadata refresh interval 'metadata.max.age.ms' from 5 min to
your desired time interval.
This may reduce the time window of non-availability broker.

-- Kamal


Re: Kafka consumers in cluster

2016-08-04 Thread Kamal C
Yes, it gets called on every re-balance.

-- Kamal

On Thu, Aug 4, 2016 at 11:24 PM, sat  wrote:

> Hi Kamal,
>
> Thanks for your prompt response. Does our custom partition assignor gets
> called during every rebalancing.
>
> Thanks and Regards
> A.SathishKumar
>
>
>
> >Implement your own custom
> >`org.apache.kafka.clients.consumer.internals.PartitionAssignor`
> >and assign all the subscribed partitions to the first consumer instance in
> >the group.
>
> >See 'partition.assignment.strategy' config in the consumer configs [1]
>
> >[1]: http://kafka.apache.org/documentation.html#newconsumerconfigs
>
>
>
> >On Thu, Aug 4, 2016 at 8:54 AM, sat  wrote:
>
> > Hi,
> >
> > We have Kafka server/broker running in a seperate machine (say machine
> A),
> > for now we are planning to have in one node. We have multiple topics and
> > all topics have only 1 partition for now.
> >
> > We have our application which includes Kafka consumers installed in
> machine
> > B and machine C. Our application in machine B and C are in cluster, hence
> > Kafka Consumers will also be in cluster. Both our consumers will have
> same
> > group id. We want all the messages to be consumed by consumer in machine
> B
> > and only when machine B is down consumer in machine C should pull
> messages.
> >
> > Since consumer in machine B and C have same group id, we came to know
> > consumer in machine B will get message for some time duration (10mins)
> and
> > then consumer in machine C will get message for some time duration. Since
> > our consumers are in cluster, we want only consumer to be active or
> receive
> > all the messages as long as it is alive.
> >
> > Please let us know how to achieve this.
> >
> >
> > Thanks and Regards
> > A.SathishKumar
> > 044-24735023
> >
>
>
>
> On Wed, Aug 3, 2016 at 8:24 PM, sat  wrote:
>
> > Hi,
> >
> > We have Kafka server/broker running in a seperate machine (say machine
> A),
> > for now we are planning to have in one node. We have multiple topics and
> > all topics have only 1 partition for now.
> >
> > We have our application which includes Kafka consumers installed in
> > machine B and machine C. Our application in machine B and C are in
> cluster,
> > hence Kafka Consumers will also be in cluster. Both our consumers will
> have
> > same group id. We want all the messages to be consumed by consumer in
> > machine B and only when machine B is down consumer in machine C should
> pull
> > messages.
> >
> > Since consumer in machine B and C have same group id, we came to know
> > consumer in machine B will get message for some time duration (10mins)
> and
> > then consumer in machine C will get message for some time duration. Since
> > our consumers are in cluster, we want only consumer to be active or
> receive
> > all the messages as long as it is alive.
> >
> > Please let us know how to achieve this.
> >
> >
> > Thanks and Regards
> > A.SathishKumar
> > 044-24735023
> >
>
>
>
> --
> A.SathishKumar
> 044-24735023
>


Re: Kafka consumers in cluster

2016-08-04 Thread Kamal C
Implement your own custom
`org.apache.kafka.clients.consumer.internals.PartitionAssignor`
and assign all the subscribed partitions to the first consumer instance in
the group.

See 'partition.assignment.strategy' config in the consumer configs [1]

[1]: http://kafka.apache.org/documentation.html#newconsumerconfigs



On Thu, Aug 4, 2016 at 8:54 AM, sat  wrote:

> Hi,
>
> We have Kafka server/broker running in a seperate machine (say machine A),
> for now we are planning to have in one node. We have multiple topics and
> all topics have only 1 partition for now.
>
> We have our application which includes Kafka consumers installed in machine
> B and machine C. Our application in machine B and C are in cluster, hence
> Kafka Consumers will also be in cluster. Both our consumers will have same
> group id. We want all the messages to be consumed by consumer in machine B
> and only when machine B is down consumer in machine C should pull messages.
>
> Since consumer in machine B and C have same group id, we came to know
> consumer in machine B will get message for some time duration (10mins) and
> then consumer in machine C will get message for some time duration. Since
> our consumers are in cluster, we want only consumer to be active or receive
> all the messages as long as it is alive.
>
> Please let us know how to achieve this.
>
>
> Thanks and Regards
> A.SathishKumar
> 044-24735023
>


Re: Kafka Consumer poll

2016-08-02 Thread Kamal C
See the answers inline.

On Tue, Aug 2, 2016 at 12:23 AM, sat  wrote:

> Hi,
>
> I am new to Kafka. We are planning to use Kafka messaging for our
> application. I was playing with Kafka 0.9.0.1 version and i have following
> queries. Sorry for asking basic questions.
>
>
> 1) I have instantiated Kafka Consumer and invoked
> consumer.poll(Long.MAX_VALUE). Although i have specified timeout as
> Long.MAX_VALUE, i observe my consumer to fetch records whenever the
> publisher publishes a message to a topic. This makes me wonder whether
> Kafka Consumer is push or pull mechanism. Please help us understand the
> logic of consumer.poll(timeout).
>

 Fetches the data from the topic, waiting up to the specified wait time *if
necessary *for a record to become available.
 Kafka Consumer by design is pull mechanism.

Take a look into Kafka Consumer java docs[1]. It's explained in detail.


> 2) What are the pros and cons of poll for long timeout vs short timeout.
>
> Short Timeout

Pros:
- On shutdown, if no data available in the topic -- Shutdown will be quick

Cons:
- Number of network trips will be high


>
> Thanks and Regards
> A.SathishKumar
>

[1]:
https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

-- Kamal


Fwd: Questions about Kafka Scripts

2016-06-08 Thread Kamal C
Hi,

A. What is the usage of `kafka-replica-verification.sh` script ? I don't
find any documentations about it in [1] and [2].

I have a topic `test` with 10 partitions. Ran the above script, it
continuously prints the below results.

[kamal@tcltest1 bin]$ sh kafka-replica-verification.sh --time -2
--topic-white-list test --report-interval-ms 1000 --broker-list
localhost:9092
2016-06-07 12:49:13,664: verification process is started.
2016-06-07 12:49:14,632: max lag is 0 for partition [test,6] at offset
39008 among 10 partitions
2016-06-07 12:49:15,633: max lag is 0 for partition [test,6] at offset
39008 among 10 partitions
2016-06-07 12:49:16,634: max lag is 0 for partition [test,6] at offset
39008 among 10 partitions
2016-06-07 12:49:17,635: max lag is 0 for partition [test,6] at offset
39008 among 10 partitions
2016-06-07 12:49:18,636: max lag is 0 for partition [test,6] at offset
39008 among 10 partitions
2016-06-07 12:49:19,637: max lag is 0 for partition [test,6] at offset
39008 among 10 partitions

B. `kafka-replay-log-producer.sh` script throws ConsumerTimeoutException.
What is wrong with the below command ?

[kamal@tcltest1 bin]$ sh kafka-replay-log-producer.sh --broker-list
localhost:9092 --inputtopic test --outputtopic xyz --reporting-interval
1000 --zookeeper localhost:2181
[2016-06-07 12:42:10,925] ERROR consumer thread timing out
(kafka.tools.ReplayLogProducer$ZKConsumerThread)
kafka.consumer.ConsumerTimeoutException
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:69)
at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33)
at
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:64)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:56)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at kafka.consumer.KafkaStream.foreach(KafkaStream.scala:25)
at
kafka.tools.ReplayLogProducer$ZKConsumerThread.run(ReplayLogProducer.scala:140)


[1]: http://kafka.apache.org/documentation.html
[2]: https://cwiki.apache.org/confluence/display/KAFKA/System+Tools


Re: newbie: kafka 0.9.0.0 producer does not terminate after producer.close()

2016-05-22 Thread Kamal C
Andy,

Kafka 0.9.0 server supports the previous versions of the clients (0.8.2,
0.8.1..).
But, new clients won't work properly with the older version of Kafka server.

You should upgrade your server / broker first.

--Kamal

On Fri, May 20, 2016 at 10:58 PM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> Hi Jaikiran
>
> Bellow is the stack trace. For completeness I see in my log file that my
> code has called
>
> producer.flush();
>
> producer.close();
>
>
>
> I get the following error, how ever I do not think this is the problem. I
> found a ??bug report?? That said this was because I was connecting to a
> 0.8x
> sever. I am able to consume my test messages using
> kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh
>
> Kind regards
>
> Andy
>
> ERROR 17:12:14 kafka-producer-network-thread | producer-1
> o.a.k.c.p.i.Sender
> run line:130 Uncaught error in kafka producer I/O thread:
>
> org.apache.kafka.common.protocol.types.SchemaException: Error reading field
> 'throttle_time_ms': java.nio.BufferUnderflowException
>
> at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:71)
>
> at
>
> org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient
> .java:464)
>
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:279)
>
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> $ jstack 908
>
> 2016-05-20 10:16:25
>
> Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.66-b17 mixed mode):
>
>
>
> "Attach Listener" #12 daemon prio=9 os_prio=31 tid=0x7fe04291c800
> nid=0x130b waiting on condition [0x]
>
>java.lang.Thread.State: RUNNABLE
>
>
>
> "kafka-producer-network-thread | producer-1" #11 daemon prio=5 os_prio=31
> tid=0x7fe041116800 nid=0x5a0f runnable [0x715d5000]
>
>java.lang.Thread.State: RUNNABLE
>
> at sun.nio.ch.KQueueArrayWrapper.kevent0(Native Method)
>
> at sun.nio.ch.KQueueArrayWrapper.poll(KQueueArrayWrapper.java:198)
>
> at sun.nio.ch.KQueueSelectorImpl.doSelect(KQueueSelectorImpl.java:103)
>
> at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>
> - locked <0x00076b67ea88> (a sun.nio.ch.Util$2)
>
> - locked <0x00076b67ea00> (a java.util.Collections$UnmodifiableSet)
>
> - locked <0x00076b67e740> (a sun.nio.ch.KQueueSelectorImpl)
>
> at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>
> at org.apache.kafka.common.network.Selector.select(Selector.java:425)
>
> at org.apache.kafka.common.network.Selector.poll(Selector.java:254)
>
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:270)
>
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:216)
>
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:128)
>
> at java.lang.Thread.run(Thread.java:745)
>
>
>
> "Service Thread" #9 daemon prio=9 os_prio=31 tid=0x7fe042015800
> nid=0x5203 runnable [0x]
>
>java.lang.Thread.State: RUNNABLE
>
>
>
> "C1 CompilerThread3" #8 daemon prio=9 os_prio=31 tid=0x7fe04285b000
> nid=0x5003 waiting on condition [0x]
>
>java.lang.Thread.State: RUNNABLE
>
>
>
> "C2 CompilerThread2" #7 daemon prio=9 os_prio=31 tid=0x7fe04282f000
> nid=0x4e03 waiting on condition [0x]
>
>java.lang.Thread.State: RUNNABLE
>
>
>
> "C2 CompilerThread1" #6 daemon prio=9 os_prio=31 tid=0x7fe041830800
> nid=0x4c03 waiting on condition [0x]
>
>java.lang.Thread.State: RUNNABLE
>
>
>
> "C2 CompilerThread0" #5 daemon prio=9 os_prio=31 tid=0x7fe04201c800
> nid=0x4a03 waiting on condition [0x]
>
>java.lang.Thread.State: RUNNABLE
>
>
>
> "Signal Dispatcher" #4 daemon prio=9 os_prio=31 tid=0x7fe042015000
> nid=0x3e0f runnable [0x]
>
>java.lang.Thread.State: RUNNABLE
>
>
>
> "Finalizer" #3 daemon prio=8 os_prio=31 tid=0x7fe04200d800 nid=0x3803
> in
> Object.wait() [0x70d3a000]
>
>java.lang.Thread.State: WAITING (on object monitor)
>
> at java.lang.Object.wait(Native Method)
>
> - waiting on <0x00076ab070b8> (a java.lang.ref.ReferenceQueue$Lock)
>
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
>
> - locked <0x00076ab070b8> (a java.lang.ref.ReferenceQueue$Lock)
>
> at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
>
> at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
>
>
>
> "Reference Handler" #2 daemon prio=10 os_prio=31 tid=0x7fe04200d000
> nid=0x3603 in Object.wait() [0x70c37000]
>
>java.lang.Thread.State: WAITING (on object monitor)
>
> at java.lang.Object.wait(Native Method)
>
> - waiting on <0x00076ab06af8> (a java.lang.ref.Reference$Lock)
>
> at java.lang.Object.wait(Object.java:502)
>
> at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:157)
>
> - locked <0x00076ab06af8> (a 

Re: What makes a message key mandatory and how to turn it off?

2016-05-04 Thread Kamal C
Yes. Use *log.cleanup.policy=delete* if you don't want to compact topics.

Reference:
https://cwiki.apache.org/confluence/display/KAFKA/Log+Compaction

On Wed, May 4, 2016 at 3:24 PM, I PVP <i...@hotmail.com> wrote:

> Kamal,
>
> Could the log.cleanup.policy=compact on server.properties  be the
> cause ?
>
> # /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --topic
> user_track --describe
> Topic:user_track PartitionCount:1 ReplicationFactor:2 Configs:
> Topic: user_track Partition: 0 Leader: 1003 Replicas: 1003,1002 Isr:
> 1003,1002
>
>
>
> Thanks
> --
> IPVP
>
>
> From: Kamal C <kamaltar...@gmail.com><mailto:kamaltar...@gmail.com>
> Reply: users@kafka.apache.org <users@kafka.apache.org>> users@kafka.apache.org>
> Date: May 4, 2016 at 6:34:34 AM
> To: users@kafka.apache.org <users@kafka.apache.org>> users@kafka.apache.org>
> Subject:  Re: What makes a message key mandatory and how to turn it off?
>
> Can you describe your topic configuration using the below command ?
>
> *sh kafka-topics.sh --zookeeper localhost:2181 --topic 
> --describe*
>
> Key for a record is mandatory only for compacted topics.
>
> --Kamal
>
> On Wed, May 4, 2016 at 2:25 PM, I PVP <i...@hotmail.com> wrote:
>
> > HI all,
> >
> > What makes a message key mandatory and how to turn it off ?
> >
> > I am migrating the messaging piece of a java application from activemq to
> > kafka.
> > The application was publishing messages to kafka(0.9.0) with no issues
> > while running on single broker on my dev machine.
> > After turning it into multi-broker, by enabling 2 additional brokers, now
> > I am seeing error messages on all producers saying that "Error when
> sending
> > message to topic  with key: null". The error happens on
> > producing messages from Java client and for kafka-console-producer.sh
> too.
> >
> > As a newbie I am not doing anything fancy around partitions, key and etc.
> >
> > The topics were being create with the following:
> > /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --topic  > name> --create
> >
> > After going multi-broker I was forced to define partitions because the
> > command above starting saying "Missing required argument "[partitions]” ,
> > so the all topics are being create with the following:
> >
> > /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --topic  > name> --create --replication-factor 2 -partitions 1
> >
> >
> > Messages were being sent by :
> > ...
> > ProducerRecord<String, Object> producerRecord = new
> ProducerRecord<String,
> > Object>(topicName, message);
> > producer.send(producerRecord);
> > ...
> >
> > To make things working again after the multi-broker change and the sunden
> > mandatory message Key requirement I change the code to use a ramdonUUID
> as
> > the message key.
> > ...
> > ProducerRecord<String, Object> producerRecord = new
> ProducerRecord<String,
> > Object>(topicName,UUID.randomUUID().toString(), message);
> >
> > producer.send(producerRecord);
> > ...
> >
> > After the change to multi-broker all topics were deleted, all log files
> > were deleted , zookeeper entries were cleaned and topics were recreated.
> >
> > Even now working fine from the java client with the UUID as the message
> > key it does not work from the command line, it looks like from the
> > command line there is no way to set the message key and the command line
> is
> > critical when needed to do quick tests.
> >
> > If none logic at the applications consuming the messages requires a key
> > why is it forcing to set a key ? The applications consuming the messages
> > only need to consume the message as they come to the topic.
> >
> > Is there a way to turn off the mandatory message key on a multi-broker
> > situation?
> >
> > Thanks
> >
> > --
> > IPVP
> >
> >
>


Re: What makes a message key mandatory and how to turn it off?

2016-05-04 Thread Kamal C
Can you describe your topic configuration using the below command ?

*sh kafka-topics.sh --zookeeper localhost:2181 --topic 
--describe*

Key for a record is mandatory only for compacted topics.

--Kamal

On Wed, May 4, 2016 at 2:25 PM, I PVP  wrote:

> HI all,
>
> What makes a message key mandatory and how to turn it off ?
>
> I am migrating the messaging piece of a java application from activemq to
> kafka.
> The application was publishing messages to kafka(0.9.0) with no issues
> while running on single broker on my dev machine.
> After turning it into multi-broker, by enabling 2 additional brokers, now
> I am seeing error messages on all producers saying that "Error when sending
> message to topic  with key: null". The error happens on
> producing messages from Java client and for kafka-console-producer.sh too.
>
> As a newbie I am not doing anything fancy around partitions, key and etc.
>
> The topics were being create with the following:
> /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --topic  name> --create
>
> After going multi-broker I  was forced to define partitions because the
> command above starting saying "Missing required argument "[partitions]” ,
> so the all  topics are being create with the following:
>
> /opt/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --topic  name> --create  --replication-factor 2 -partitions 1
>
>
> Messages were being sent by :
> ...
> ProducerRecord producerRecord = new ProducerRecord Object>(topicName, message);
> producer.send(producerRecord);
> ...
>
> To make things working  again after the multi-broker change and the sunden
> mandatory message Key requirement  I change the code to use a ramdonUUID as
> the message key.
> ...
> ProducerRecord producerRecord = new ProducerRecord Object>(topicName,UUID.randomUUID().toString(), message);
>
> producer.send(producerRecord);
> ...
>
> After the change to multi-broker all topics were deleted, all log files
> were deleted , zookeeper entries were cleaned and topics were recreated.
>
> Even now working fine from the java client with the UUID as the message
> key  it does not work from the command line,  it looks like from the
> command line there is no way to set the message key and the command line is
> critical when needed to do quick tests.
>
> If none logic at the applications consuming the messages requires a key
> why  is it forcing to set a key ? The applications consuming the messages
> only need to consume the message as they come to the topic.
>
> Is there a way to turn off the mandatory message key on a multi-broker
> situation?
>
> Thanks
>
> --
> IPVP
>
>


Re: Consumer Client - How to simulate heartbeats ?

2016-04-18 Thread Kamal C
Yes, you're right. No need to seek the offsets in onPartitionsAssigned
method internally KafkaConsumer handles it.

Thanks for sharing this with me. I'll update it.

--Kamal

On Mon, Apr 18, 2016 at 7:29 PM, Florian Hussonnois <fhussonn...@gmail.com>
wrote:

> Yes, but the ConsumerRebalanceListener is optional and by the default
> KafkaConsumer uses a NoOpConsumerRebalanceListener if no one is provided.
>
> I think the seek() is already done internally when a consumer joins or
> quits the group. I'm not sure this line is actually needed.
>
> 2016-04-18 15:31 GMT+02:00 Kamal C <kamaltar...@gmail.com>:
>
> > When a new consumer joins to the group, it should start to read data
> > from where the other consumer left.
> >
> > --Kamal
> >
> > On Mon, Apr 18, 2016 at 6:58 PM, Florian Hussonnois <
> fhussonn...@gmail.com
> > >
> > wrote:
> >
> > > Thank you very much, the example is really helpful.
> > >
> > > My last question is : Why is it necessay to seek the consumer offsets
> > into
> > > the onPartitionsAssigned method ?
> > >
> > >
> > >
> >
> https://github.com/omkreddy/kafka-examples/blob/master/consumer/src/main/java/kafka/examples/consumer/advanced/AdvancedConsumer.java#L120
> > >
> > > 2016-04-15 15:06 GMT+02:00 Kamal C <kamaltar...@gmail.com>:
> > >
> > > > Hi Florian,
> > > >
> > > > This may be helpful
> > > >
> > > >
> > >
> >
> https://github.com/omkreddy/kafka-examples/blob/master/consumer/src/main/java/kafka/examples/consumer/advanced/AdvancedConsumer.java
> > > >
> > > > --Kamal
> > > >
> > > > On Fri, Apr 15, 2016 at 2:57 AM, Jason Gustafson <ja...@confluent.io
> >
> > > > wrote:
> > > >
> > > > > Hi Florian,
> > > > >
> > > > > It's actually OK if processing takes longer than the heartbeat
> > > interval,
> > > > > but it does need to finish before the session timeout expires or
> the
> > > > > consumer will be kicked out of the group (which typically is
> revealed
> > > by
> > > > > commit failures). If the problem is just that the consumer is
> > handling
> > > > too
> > > > > many messages at once, then Kafka 0.10 has an option to tune the
> > number
> > > > of
> > > > > messages returned from poll() (max.poll.records), which may be
> > helpful.
> > > > We
> > > > > also have a pause/resume API which allows you to call poll()
> without
> > > > > consuming any data. That's the best option at the moment for 0.9
> > > > consumers.
> > > > >
> > > > > For what it's worth, we've considered several times adding a
> > > heartbeat()
> > > > > API, but the challenge is figuring out how to handle rebalancing.
> > > > > Underneath the covers, we use heartbeats to find out when the group
> > is
> > > > > rebalancing, so a heartbeat() option would probably have to return
> a
> > > flag
> > > > > indicating whether a rebalance was needed. If the group has begun
> > > > > rebalancing, then you would need to call poll() before the
> expiration
> > > of
> > > > > the session timeout so that the consumer can join the rebalance.
> > > > > Alternatively, we could let heartbeat() complete the rebalance
> > itself,
> > > > but
> > > > > then you'd have to be prepared to abort processing from the
> rebalance
> > > > > callback. That's not really different from calling poll() after
> > pausing
> > > > > partitions though. The main problem in any case is that once a
> > > rebalance
> > > > > begins, you have the duration of the session timeout to stop
> > processing
> > > > and
> > > > > join the rebalance. We're seeing this problem pop up pretty much
> > > > everywhere
> > > > > that the consumer is used, so we're trying to think of some better
> > > > options
> > > > > to handle it.
> > > > >
> > > > > Thanks,
> > > > > Jason
> > > > >
> > > > >
> > > > > On Thu, Apr 14, 2016 at 12:32 PM, Florian Hussonnois <
> > > > > fhussonn...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi everyone,
> > > > > >
> > > > > > I have a use case where a message can take longer than '
> > > > > > heartbeat.interval.ms' to be processed by my application. As I
> > > > > understand
> > > > > > the heartbeats of consumer are done while the poll method is
> > invoked.
> > > > > >
> > > > > > I would like to instantiate a worker thread to process the
> messages
> > > > but I
> > > > > > need to wait for the messages completion before polling again.
> > > > > >
> > > > > > Is there a way to force the consumer to make an heartbeat without
> > > > polling
> > > > > > new messages ?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > --
> > > > > > Florian HUSSONNOIS
> > > > > >
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Florian HUSSONNOIS
> > >
> >
>
>
>
> --
> Florian HUSSONNOIS
>


Re: Consumer Client - How to simulate heartbeats ?

2016-04-18 Thread Kamal C
When a new consumer joins to the group, it should start to read data
from where the other consumer left.

--Kamal

On Mon, Apr 18, 2016 at 6:58 PM, Florian Hussonnois <fhussonn...@gmail.com>
wrote:

> Thank you very much, the example is really helpful.
>
> My last question is : Why is it necessay to seek the consumer offsets into
> the onPartitionsAssigned method ?
>
>
> https://github.com/omkreddy/kafka-examples/blob/master/consumer/src/main/java/kafka/examples/consumer/advanced/AdvancedConsumer.java#L120
>
> 2016-04-15 15:06 GMT+02:00 Kamal C <kamaltar...@gmail.com>:
>
> > Hi Florian,
> >
> > This may be helpful
> >
> >
> https://github.com/omkreddy/kafka-examples/blob/master/consumer/src/main/java/kafka/examples/consumer/advanced/AdvancedConsumer.java
> >
> > --Kamal
> >
> > On Fri, Apr 15, 2016 at 2:57 AM, Jason Gustafson <ja...@confluent.io>
> > wrote:
> >
> > > Hi Florian,
> > >
> > > It's actually OK if processing takes longer than the heartbeat
> interval,
> > > but it does need to finish before the session timeout expires or the
> > > consumer will be kicked out of the group (which typically is revealed
> by
> > > commit failures). If the problem is just that the consumer is handling
> > too
> > > many messages at once, then Kafka 0.10 has an option to tune the number
> > of
> > > messages returned from poll() (max.poll.records), which may be helpful.
> > We
> > > also have a pause/resume API which allows you to call poll() without
> > > consuming any data. That's the best option at the moment for 0.9
> > consumers.
> > >
> > > For what it's worth, we've considered several times adding a
> heartbeat()
> > > API, but the challenge is figuring out how to handle rebalancing.
> > > Underneath the covers, we use heartbeats to find out when the group is
> > > rebalancing, so a heartbeat() option would probably have to return a
> flag
> > > indicating whether a rebalance was needed. If the group has begun
> > > rebalancing, then you would need to call poll() before the expiration
> of
> > > the session timeout so that the consumer can join the rebalance.
> > > Alternatively, we could let heartbeat() complete the rebalance itself,
> > but
> > > then you'd have to be prepared to abort processing from the rebalance
> > > callback. That's not really different from calling poll() after pausing
> > > partitions though. The main problem in any case is that once a
> rebalance
> > > begins, you have the duration of the session timeout to stop processing
> > and
> > > join the rebalance. We're seeing this problem pop up pretty much
> > everywhere
> > > that the consumer is used, so we're trying to think of some better
> > options
> > > to handle it.
> > >
> > > Thanks,
> > > Jason
> > >
> > >
> > > On Thu, Apr 14, 2016 at 12:32 PM, Florian Hussonnois <
> > > fhussonn...@gmail.com>
> > > wrote:
> > >
> > > > Hi everyone,
> > > >
> > > > I have a use case where a message can take longer than '
> > > > heartbeat.interval.ms' to be processed by my application. As I
> > > understand
> > > > the heartbeats of consumer are done while the poll method is invoked.
> > > >
> > > > I would like to instantiate a worker thread to process the messages
> > but I
> > > > need to wait for the messages completion before polling again.
> > > >
> > > > Is there a way to force the consumer to make an heartbeat without
> > polling
> > > > new messages ?
> > > >
> > > > Thanks,
> > > >
> > > > --
> > > > Florian HUSSONNOIS
> > > >
> > >
> >
>
>
>
> --
> Florian HUSSONNOIS
>


Re: Consumer Client - How to simulate heartbeats ?

2016-04-15 Thread Kamal C
Hi Florian,

This may be helpful
https://github.com/omkreddy/kafka-examples/blob/master/consumer/src/main/java/kafka/examples/consumer/advanced/AdvancedConsumer.java

--Kamal

On Fri, Apr 15, 2016 at 2:57 AM, Jason Gustafson  wrote:

> Hi Florian,
>
> It's actually OK if processing takes longer than the heartbeat interval,
> but it does need to finish before the session timeout expires or the
> consumer will be kicked out of the group (which typically is revealed by
> commit failures). If the problem is just that the consumer is handling too
> many messages at once, then Kafka 0.10 has an option to tune the number of
> messages returned from poll() (max.poll.records), which may be helpful. We
> also have a pause/resume API which allows you to call poll() without
> consuming any data. That's the best option at the moment for 0.9 consumers.
>
> For what it's worth, we've considered several times adding a heartbeat()
> API, but the challenge is figuring out how to handle rebalancing.
> Underneath the covers, we use heartbeats to find out when the group is
> rebalancing, so a heartbeat() option would probably have to return a flag
> indicating whether a rebalance was needed. If the group has begun
> rebalancing, then you would need to call poll() before the expiration of
> the session timeout so that the consumer can join the rebalance.
> Alternatively, we could let heartbeat() complete the rebalance itself, but
> then you'd have to be prepared to abort processing from the rebalance
> callback. That's not really different from calling poll() after pausing
> partitions though. The main problem in any case is that once a rebalance
> begins, you have the duration of the session timeout to stop processing and
> join the rebalance. We're seeing this problem pop up pretty much everywhere
> that the consumer is used, so we're trying to think of some better options
> to handle it.
>
> Thanks,
> Jason
>
>
> On Thu, Apr 14, 2016 at 12:32 PM, Florian Hussonnois <
> fhussonn...@gmail.com>
> wrote:
>
> > Hi everyone,
> >
> > I have a use case where a message can take longer than '
> > heartbeat.interval.ms' to be processed by my application. As I
> understand
> > the heartbeats of consumer are done while the poll method is invoked.
> >
> > I would like to instantiate a worker thread to process the messages but I
> > need to wait for the messages completion before polling again.
> >
> > Is there a way to force the consumer to make an heartbeat without polling
> > new messages ?
> >
> > Thanks,
> >
> > --
> > Florian HUSSONNOIS
> >
>


Async Consumer: atleast once delivery

2016-03-21 Thread Kamal C
Hi All,

I'm using Kafka 0.9.0.1.

I have a requirement in which consumption of records are asynchronous.


*for (ConsumerRecord record : records) {*

*executor.submit(new Runnable() {*

*public void run() {*


*// process record;}*

*});*

*}*
*consumer.commitSync(); //Shouldn't commit here*

The completion of *for()* loop doesn't mean that records are processed. A
record is processed only when a job gets executed successfully.

On application failure / restart, I've to submit the unprocessed records. I
came up with a ack based approach but it's not scalable. How to track the
offsets of processed records?


--Kamal


Re: How to assign all the partitions of a topic to consumer ?

2016-03-19 Thread Kamal C
Thanks Jason! It worked. I've missed it.


On 17-Mar-2016 2:00 AM, "Jason Gustafson" <ja...@confluent.io> wrote:

> Have you looked at partitionsFor()?
>
> -Jason
>
> On Wed, Mar 16, 2016 at 4:58 AM, Kamal C <kamaltar...@gmail.com> wrote:
>
> > Hi,
> >
> > I'm using the new consumer in assign mode. I would like to assign all
> > the partitions of a topic to the consumer.
> >
> > For that, I need to know the number of partitions available in the topic.
> >
> > *consumer.assign(List partitions);*
> >
> > How to programmatically get the number of partitions in a topic?
> >
> > --Kamal
> >
>


Re: seekToBeginning doesn't work without auto.offset.reset

2016-03-09 Thread Kamal C
Cody,

Use ConsumerRebalanceListener to achieve that,

ConsumerRebalanceListener listener = new ConsumerRebalanceListener() {

@Override
public void onPartitionsRevoked(Collection
partitions) {
}

@Override
public void onPartitionsAssigned(Collection
partitions) {
consumer.seekToBeginning(partitions.toArray(new
TopicPartition[0]));
}
};

consumer.subscribe(topics, listener);

On Wed, Mar 9, 2016 at 12:05 PM, Cody Koeninger  wrote:

> That suggestion doesn't work, for pretty much the same reason - at the
> time poll is first called, there is no reset policy and no committed
> offset, so NoOffsetForPartitionException is thrown
>
> I feel like the underlying problem isn't so much that seekToEnd needs
> special case behavior.  It's more that  topic metadata fetches,
> consumer position fetches, and message fetches are all lumped together
> under a single poll() call, with no way to do them individually if
> necessary.
>
> What does "work" in this situation is to just catch the exception
> (which leaves the consumer in a state where topics are assigned) and
> then seek.  But that is not exactly an elegant interface.
>
> consumer.subscribe(topics)
> try {
>   consumer.poll(0)
> } catch {
>   case x: Throwable =>
> }
> consumer.seekToBeginning()
> consumer.poll(0)
>
>
>
>
> On Tue, Mar 8, 2016 at 11:22 PM, Guozhang Wang  wrote:
> > Hi Cody,
> >
> > The problem with that code is in `seekToBeginning()` followed by
> > `subscribe(topic)`.
> >
> > Since `subscribe` call is lazy evaluated, by the time `seekToBeginning()`
> > is called no partition is assigned yet, and hence it is effectively an
> > no-op.
> >
> > Try
> >
> > consumer.subscribe(topics)
> > consumer.poll(0);  // get assigned partitions
> > consumer.seekToBeginning()
> > consumer.poll(0)
> >
> > to see if that works.
> >
> > I think it is a valid issue that can be fixed in the new consumer that,
> > upon calling seekToEnd/Beginning with no parameter, while no assigned is
> > done yet, do the coordination behind the scene; it will though change the
> > behavior of the functions as they are no longer always lazily evaluated.
> >
> >
> > Guozhang
> >
> >
> > On Tue, Mar 8, 2016 at 2:08 PM, Cody Koeninger 
> wrote:
> >
> >> Using the 0.9 consumer, I would like to start consuming at the
> >> beginning or end, without specifying auto.offset.reset.
> >>
> >> This does not seem to be possible:
> >>
> >> val kafkaParams = Map[String, Object](
> >>   "bootstrap.servers" -> conf.getString("kafka.brokers"),
> >>   "key.deserializer" -> classOf[StringDeserializer],
> >>   "value.deserializer" -> classOf[StringDeserializer],
> >>   "group.id" -> "example",
> >>   "auto.offset.reset" -> "none"
> >> ).asJava
> >> val topics = conf.getString("kafka.topics").split(",").toList.asJava
> >> val consumer = new KafkaConsumer[String, String](kafkaParams)
> >> consumer.subscribe(topics)
> >> consumer.seekToBeginning()
> >> consumer.poll(0)
> >>
> >>
> >> Results in:
> >>
> >> Exception in thread "main"
> >> org.apache.kafka.clients.consumer.NoOffsetForPartitionException:
> >> Undefined offset with no reset policy for partition: testtwo-4
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.Fetcher.resetOffset(Fetcher.java:288)
> >> at
> >>
> org.apache.kafka.clients.consumer.internals.Fetcher.updateFetchPositions(Fetcher.java:167)
> >> at
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:1302)
> >> at
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:895)
> >> at
> >>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:853)
> >> at example.BasicKafkaConsumer$.main(BasicKafkaConsumer.scala:25)
> >>
> >>
> >> I'm assuming this is because, at the time seekToBeginning() is called,
> >> subscriptions.assignedPartitions isn't populated.  But polling in
> >> order to assign topicpartitions results in an error, which creates a
> >> chicken-or-the-egg situation.
> >>
> >> I don't want to set auto.offset.reset, because I want a hard error if
> >> the offsets are out of range at any other time during consumption.
> >>
> >
> >
> >
> > --
> > -- Guozhang
>


Re: Kafka + ZooKeeper on the same hardware?

2016-01-14 Thread Kamal C
It's a single point of failure. You may lose high-availability.

On Thu, Jan 14, 2016 at 4:36 PM, Erik Forsberg  wrote:

> Hi!
>
> Pondering how to configure Kafka clusters and avoid having too many
> machines to manage.. Would it be recommended to run say a 3 node kafka
> cluster where you also run your 3 node zookeeper cluster on the same
> machines?
>
> I guess the answer is that "it depends on load", but would be interested
> in any opinions on this anyway.
>
> Thanks!
> \EF
>


Re: Kafka + ZooKeeper on the same hardware?

2016-01-14 Thread Kamal C
Yes, it can sustain one failure. Misunderstood your question..



On Thu, Jan 14, 2016 at 5:14 PM, Erik Forsberg <forsb...@opera.com> wrote:

>
>
> On 2016-01-14 12:42, Kamal C wrote:
>
>> It's a single point of failure. You may lose high-availability.
>>
>
> In this case I would like to protect myself from 1 machine going down, and
> my replication factor for Kafka would be 2. So in the case of one machine
> going down, Zookeeper cluster would still be operational, and Kafka would
> as well if I understand things correctly.
>
> Regards,
> \EF
>
>


Re: kafka producer error

2015-09-09 Thread Kamal C
Refer the mail list
http://qnalist.com/questions/6002514/new-producer-metadata-update-problem-on-2-node-cluster

https://issues.apache.org/jira/browse/KAFKA-1843

On Wed, Sep 9, 2015 at 7:37 AM, Shushant Arora 
wrote:

> Hi
>
> I have a kafka cluster with 3 brokers. I have a topic with ~50 partitions
> and replication factpr of 3.
>
> When 2 brokers are down - I m getting below error in producer code
>
> 5/09/09 00:56:15 WARN network.Selector: Error in I/O with brokerIP(Ip
> of broker which is down)
> java.net.ConnectException: Connection refused
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
> at java.lang.Thread.run(Thread.java:745)
>
> Producer config are :
>
>
>
> value.serializer = class
> org.apache.kafka.common.serialization.StringSerializer
> key.serializer = class
> org.apache.kafka.common.serialization.StringSerializer
> block.on.buffer.full = true
> retry.backoff.ms = 200
> buffer.memory = 10485760
> batch.size = 16384
> metrics.sample.window.ms = 3
> metadata.max.age.ms = 30
> receive.buffer.bytes = 32768
> timeout.ms = 3
> max.in.flight.requests.per.connection = 5
> bootstrap.servers = [broker1:9092,broker2:9092,broker3:9092]
> retries = 10
> max.request.size = 1048576
> send.buffer.bytes = 131072
> acks = 1
> reconnect.backoff.ms = 10
> linger.ms = 0
> metrics.num.samples = 2
> metadata.fetch.timeout.ms = 6
>
>
> despite being acks as 1 - I m getting IOError - and its giving error for
> broker which is down - but my topic has replication and when I see in kafka
> console describe on topic - new leader of all partitions is broker which is
> Up.
>
> So why the IOException is coming?
>


Re: How Producer handles Network Connectivity Issues

2015-05-27 Thread Kamal C
Thanks for the response Ewen!

On Tue, May 26, 2015 at 10:52 PM, Ewen Cheslack-Postava e...@confluent.io
wrote:

 It's not being switched in this case because the broker hasn't failed. It
 can still connect to all the other brokers and zookeeper. The only failure
 is of the link between a client and the broker.

 Another way to think of this is to extend the scenario with more producers.
 If I have 100 other producers and they can all still connect, would you
 still consider this a failure and expect the leader to change? Since
 network partitions (or periods of high latency, or long GC pauses, etc) can
 happen arbitrarily and clients might be spread far and wide, you can't rely
 on their connectivity as an indicator of the health of the Kafka broker.

 Of course, there's also a pretty big practical issue: since the client
 can't connect to the broker, how would it even report that it has a
 connectivity issue?

 -Ewen

 On Mon, May 25, 2015 at 10:05 PM, Kamal C kamaltar...@gmail.com wrote:

  Hi,
 
  I have a cluster of 3 Kafka brokers and a remote producer. Producer
  started to send messages to *SampleTopic*. Then I blocked the network
  connectivity between the Producer and the leader node for the topic
  *SampleTopic* but network connectivity is healthy between the cluster and
  producer is able to reach the other two nodes.
 
  *With Script*
 
  sh kafka-topics.sh --zookeeper localhost --describe
  Topic:SampleTopicPartitionCount:1ReplicationFactor:3Configs:
  Topic: SampleTopicPartition: 0Leader: 1Replicas: 1,2,0
  Isr: 1,2,0
 
 
  Producer tries forever to reach the leader node by throwing connection
  refused exception. I understand that when there is a node failure leader
  gets switched. Why it's not switching the leader in this scenario ?
 
  --
  Kamal C
 



 --
 Thanks,
 Ewen



How Producer handles Network Connectivity Issues

2015-05-25 Thread Kamal C
Hi,

I have a cluster of 3 Kafka brokers and a remote producer. Producer
started to send messages to *SampleTopic*. Then I blocked the network
connectivity between the Producer and the leader node for the topic
*SampleTopic* but network connectivity is healthy between the cluster and
producer is able to reach the other two nodes.

*With Script*

sh kafka-topics.sh --zookeeper localhost --describe
Topic:SampleTopicPartitionCount:1ReplicationFactor:3Configs:
Topic: SampleTopicPartition: 0Leader: 1Replicas: 1,2,0
Isr: 1,2,0


Producer tries forever to reach the leader node by throwing connection
refused exception. I understand that when there is a node failure leader
gets switched. Why it's not switching the leader in this scenario ?

--
Kamal C


Re: how to wait next log message on the partition queue

2015-05-25 Thread Kamal C
It won't throw OffsetOutOfRange error when you pass the latest offset to
the fetch request, the resulting fetch response message set would be empty.
You can wait for message to be available either manually [or] by
configuring *maxWait* in fetch request.

On Mon, May 25, 2015 at 3:50 PM, Nipur Patodi er.nipur.pat...@gmail.com
wrote:

 Adding to Xiao , If you are using a low level consumer (simple consumer),
 then for scenario #1 consumer will get OffsetOutOfRange error with error
 code 1.
 Please find more details at this
 
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
 
 link

 Thanks,

 _NIpur



 On Mon, May 25, 2015 at 2:30 PM, tao xiao xiaotao...@gmail.com wrote:

  The default behavior of high level consumer is to wait until next message
  come along. If you want to change this behavior you can change the
 consumer
  setting consumer.timeout.ms to some value that is greater than -1
 
  On Mon, 25 May 2015 at 16:57 Ganesh Nikam ganesh.ni...@gslab.com
 wrote:
 
   HI All,
  
   I have basic question on kafka partition. Lets say I have one topic
 with
   partition 0. There is one producer which is producing log messages for
   this topic and there is one consumer also which is consuming messages
 on
   this topic. Assuming consumer is faster, after some time it reaches to
   the last message on the partition. Now the questions are:
1. If the consumer tries to fetch next message, then what value
   will be return by Kafka server to consumer for offset ?
  
2. If I want my consumer to wait till producer sends next log
   message then how can I achieve this ? How can I make my consumer to
 wait
   for next message ?
  
  
   - Ganesh
  
 



Re: Kafka Cluster Issue

2015-05-05 Thread Kamal C
This is resolved. As I missed host entry configuration in my infrastructure.

On Mon, May 4, 2015 at 10:35 AM, Kamal C kamaltar...@gmail.com wrote:

 We are running ZooKeeper in ensemble (Cluster of 3 / 5).  With further
 investigation, I found that the Connect Exception throws for all inflight
 producers.

 Say we are pushing 50 msg/s to a topic. Stop the leader Kafka for that
 topic. Producers are unable to push messages to the Kafka Cluster and
 starts throwing the Connect Exception.



 *Exception* WARN 2015-05-04 10:27:41,052 [kafka-producer-network-thread |
 NOTIFICATION_CATEGORY_ALARM]: Selector:poll() :  : Error in I/O with
 tcstest2.nmsworks.co.in/192.168.11.140
 java.net.ConnectException: Connection refused
 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
 ~[?:1.7.0_40]
 at
 sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735)
 ~[?:1.7.0_40]
 at
 org.apache.kafka.common.network.Selector.poll(Selector.java:238)
 [kafka-clients-0.8.2.0.jar:?]
 at
 org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
 [kafka-clients-0.8.2.0.jar:?]
 at
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
 [kafka-clients-0.8.2.0.jar:?]
 at
 org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
 [kafka-clients-0.8.2.0.jar:?]


 *Description of a Kafka topic*

 [root@tcstest2 bin]# sh kafka-topics.sh --zookeeper localhost:2181
 --describe | grep NOTIFICATION_CATEGORY_ALARM
 Topic:NOTIFICATION_CATEGORY_ALARMPartitionCount:1
 ReplicationFactor:3Configs:
 Topic: NOTIFICATION_CATEGORY_ALARMPartition: 0Leader: 1
 Replicas: 2,0,1Isr: 1,0


 Leader is switching but producers are unable to find the new leader. How
 to resolve it?


 On Sun, May 3, 2015 at 11:13 PM, Jiangjie Qin j...@linkedin.com.invalid
 wrote:

 What do you mean by cluster mode with 3 Zookeeper and 3 Kafka brokers? Do
 you mean 1 Zookeeper and 3 brokers?

 On 5/2/15, 11:01 PM, Kamal C kamaltar...@gmail.com wrote:

 Any comments on this issue?
 
 On Sat, May 2, 2015 at 9:16 AM, Kamal C kamaltar...@gmail.com wrote:
 
  Hi,
  We are using Kafka_2.10-0.8.2.0, new Kafka producer and Kafka Simple
  Consumer. In Standalone mode, 1 ZooKeeper and 1 Kafka we haven't faced
 any
  problems.
 
  In cluster mode, 3 ZooKeeper and 3 Kafka Brokers. We did some sanity
  testing by bringing a Kafka node down then a random Producer starts to
  throw Connect Exception continuously and tries to connect with the dead
  node (not all producers).
 
  Is there any configuration available to avoid this exception ?
 
  Regards,
  Kamal C
 
 
 
 
 





Re: Kafka Cluster Issue

2015-05-03 Thread Kamal C
Any comments on this issue?

On Sat, May 2, 2015 at 9:16 AM, Kamal C kamaltar...@gmail.com wrote:

 Hi,
 We are using Kafka_2.10-0.8.2.0, new Kafka producer and Kafka Simple
 Consumer. In Standalone mode, 1 ZooKeeper and 1 Kafka we haven't faced any
 problems.

 In cluster mode, 3 ZooKeeper and 3 Kafka Brokers. We did some sanity
 testing by bringing a Kafka node down then a random Producer starts to
 throw Connect Exception continuously and tries to connect with the dead
 node (not all producers).

 Is there any configuration available to avoid this exception ?

 Regards,
 Kamal C







Re: Kafka Cluster Issue

2015-05-03 Thread Kamal C
We are running ZooKeeper in ensemble (Cluster of 3 / 5).  With further
investigation, I found that the Connect Exception throws for all inflight
producers.

Say we are pushing 50 msg/s to a topic. Stop the leader Kafka for that
topic. Producers are unable to push messages to the Kafka Cluster and
starts throwing the Connect Exception.



*Exception* WARN 2015-05-04 10:27:41,052 [kafka-producer-network-thread |
NOTIFICATION_CATEGORY_ALARM]: Selector:poll() :  : Error in I/O with
tcstest2.nmsworks.co.in/192.168.11.140
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
~[?:1.7.0_40]
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735)
~[?:1.7.0_40]
at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
[kafka-clients-0.8.2.0.jar:?]
at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
[kafka-clients-0.8.2.0.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
[kafka-clients-0.8.2.0.jar:?]
at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
[kafka-clients-0.8.2.0.jar:?]


*Description of a Kafka topic*

[root@tcstest2 bin]# sh kafka-topics.sh --zookeeper localhost:2181
--describe | grep NOTIFICATION_CATEGORY_ALARM
Topic:NOTIFICATION_CATEGORY_ALARMPartitionCount:1
ReplicationFactor:3Configs:
Topic: NOTIFICATION_CATEGORY_ALARMPartition: 0Leader: 1
Replicas: 2,0,1Isr: 1,0


Leader is switching but producers are unable to find the new leader. How to
resolve it?


On Sun, May 3, 2015 at 11:13 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 What do you mean by cluster mode with 3 Zookeeper and 3 Kafka brokers? Do
 you mean 1 Zookeeper and 3 brokers?

 On 5/2/15, 11:01 PM, Kamal C kamaltar...@gmail.com wrote:

 Any comments on this issue?
 
 On Sat, May 2, 2015 at 9:16 AM, Kamal C kamaltar...@gmail.com wrote:
 
  Hi,
  We are using Kafka_2.10-0.8.2.0, new Kafka producer and Kafka Simple
  Consumer. In Standalone mode, 1 ZooKeeper and 1 Kafka we haven't faced
 any
  problems.
 
  In cluster mode, 3 ZooKeeper and 3 Kafka Brokers. We did some sanity
  testing by bringing a Kafka node down then a random Producer starts to
  throw Connect Exception continuously and tries to connect with the dead
  node (not all producers).
 
  Is there any configuration available to avoid this exception ?
 
  Regards,
  Kamal C
 
 
 
 
 




Kafka Cluster Issue

2015-05-02 Thread Kamal C
Hi,
We are using Kafka_2.10-0.8.2.0, new Kafka producer and Kafka Simple
Consumer. In Standalone mode, 1 ZooKeeper and 1 Kafka we haven't faced any
problems.

In cluster mode, 3 ZooKeeper and 3 Kafka Brokers. We did some sanity
testing by bringing a Kafka node down then a random Producer starts to
throw Connect Exception continuously and tries to connect with the dead
node (not all producers).

Is there any configuration available to avoid this exception ?

Regards,
Kamal C