Doubts Kafka

2015-02-08 Thread Eduardo Costa Alfaia
Hi Guys,

I have some doubts about the Kafka, the first is Why sometimes the applications 
prefer to connect to zookeeper instead brokers? Connecting to zookeeper could 
create an overhead, because we are inserting other element between producer and 
consumer. Another question is about the information sent by producer, in my 
tests the producer send the messages to brokers and a few minutes my HardDisk 
is full (my harddisk has 250GB), is there something to do in the configuration 
to minimize this?

Thanks 
-- 
Informativa sulla Privacy: http://www.unibs.it/node/8155


Can't send a keyedMessage to brokers with partitioner.class=kafka.producer.DefaultPartitioner

2015-02-08 Thread Zijing Guo
Hi,I have a 2 nodes kafka cluster with 2 instances of brokers and zookeepers. 
And then I create a topic kafka-test with 2 partitions and replication-factor 
=2. My producer config is:                      {partitioner.class 
kafka.producer.DefaultPartitioner                      metadata.broker.list 
172.32.1.248:9092,172.32.1.251:9092                      
request.required.acks 1}
So for the DefaultPartitoner, it will calculate and hashvalue and divide by the 
num_partiton to decide which partition the data it will go, so I create my 
keyedMessageval key-msg = KeyedMessage(kafka-test,a,test 
message!)prod.send(key-msg)
a's hashValue is 97 and 97 % 2 = 1, so the data should go to partition1. 
However, the data did't get send to the brokers (I have a console consumer 
running that didn't receive any message from this topic). If I create the 
key-msg without the key, it works fine
val key-msg = KeyedMessage(kafka-test,test message!)prod.send(key-msg)
Am I using the key wrong or anything?ThanksEdwin


Re: high cpu and network traffic when cluster has no topic

2015-02-08 Thread Jay Kreps
Sounds like this patch fixed the issue. It would be good to get some review
on KAFKA-1919--it is only a four line change.

On Wed, Feb 4, 2015 at 1:15 PM, Steven Wu stevenz...@gmail.com wrote:

 Bhavesh,

 unfortunately, ps cmd in Mac doesn't display thread id. I tried DTrace, but
 it only shows kernel thread id (not Java thread id).

 anyway, I updated the jira with producer metrics. it clearly shows request
 rate shoot up to 18K/sec.

 Thanks,
 Steven

 On Wed, Feb 4, 2015 at 9:48 AM, Steven Wu stevenz...@gmail.com wrote:

  Bhavesh,
 
  this is on Mac OS. I couldn't get similar options to make ps/jstack work
  on Mac. will continue to try if I can make them work.
 
  logging output does show kafka-producer-network-thread sends two
  metadata requests per milli-seconds.
 
  Thanks,
  Steven
 
 
  On Wed, Feb 4, 2015 at 9:15 AM, Bhavesh Mistry 
 mistry.p.bhav...@gmail.com
   wrote:
 
  Hi Steven,
 
  Can you please try to see if io thread is indeed a problem ?  The
  following
  on works on Linux:
 
  ps  -p $java_pid -L -o tid,pcpu
  jstack -F $java_pid
 
  Then compare the thread # (may have to Hex # to decimal) between the
  Jstack
  and ps command.  This will  tell you which thread is consuming more CPU
  for
  that process.
 
  Thanks,
 
  Bhavesh
 
  On Wed, Feb 4, 2015 at 9:01 AM, Steven Wu stevenz...@gmail.com wrote:
 
   I have re-run my unit test with 0.8.2.0. same tight-loop problem
  happened
   after a few mins.
  
   On Tue, Feb 3, 2015 at 10:00 PM, Guozhang Wang wangg...@gmail.com
  wrote:
  
Steven, you may be hitting on KAFKA-1642
https://issues.apache.org/jira/browse/KAFKA-1642.
   
As Jay said, a bunch of such issues are fixed in the new release.
  Please
let us know if you still see the issue with it.
   
Guozhang
   
On Tue, Feb 3, 2015 at 8:52 PM, Steven Wu stevenz...@gmail.com
  wrote:
   
 sure. will try my unit test again with 0.8.2.0 release tomorrow
 and
report
 back my findings.

 On Tue, Feb 3, 2015 at 8:42 PM, Jay Kreps jay.kr...@gmail.com
  wrote:

  Hey Steven,
 
  That sounds like a bug. I think we fixed a few producer high cpu
   issues
  since the beta, I wonder if you could repeat the same test with
  the
 0.8.2.
  final release?
 
  -Jay
 
  On Tue, Feb 3, 2015 at 8:37 PM, Steven Wu stevenz...@gmail.com
 
wrote:
 
   actually, my local test can reproduce the issue although not
 immediately.
   seems to happen after a few mins. I enabled TRACE level
 logging.
   here
  seems
   to be the tight loop. you can see that there are two metadata
requests
 in
   one milli-seconds.
  
   kafka-producer-network-thread | foo 20:34:32,626 TRACE
 NetworkClient:301
  -
   Ignoring empty metadata response with correlation id 360185.
   kafka-producer-network-thread | foo 20:34:32,626 DEBUG
 NetworkClient:369
  -
   Trying to send metadata request to node -2
   kafka-producer-network-thread | foo 20:34:32,626 DEBUG
 NetworkClient:374
  -
   Sending metadata request ClientRequest(expectResponse=true,
 payload=null,
  
  
 

   
  
 
 request=RequestSend(header={api_key=3,api_version=0,correlation_id=360186,client_id=foo},
   body={topics=[]})) to node -2
   kafka-producer-network-thread | foo 20:34:32,626 TRACE
 NetworkClient:301
  -
   Ignoring empty metadata response with correlation id 360186.
   kafka-producer-network-thread | foo 20:34:32,626 DEBUG
 NetworkClient:369
  -
   Trying to send metadata request to node -2
  
  
   On Tue, Feb 3, 2015 at 8:10 PM, Steven Wu 
 stevenz...@gmail.com
  
 wrote:
  
Hi,
   
We have observed high cpu and high network traffic problem
  when
1) cluster (0.8.1.1) has no topic
2) KafkaProducer (0.8.2-beta) object is created without
  sending
   any
   traffic
   
We have observed such problem twice. In both cases, problem
  went
away
immediately after one/any topic is created.
   
Is this a known issue? Just want to check with the community
   first
  before
I spend much time to reproduce it.
   
I couldn't reproduce the issue with similar setup with unit
  test
code
  in
IDE. start two brokers with no topic locally on my laptop.
   create a
KafkaProducer object without sending any msgs. but I only
  tested
with
0.8.2-beta for both broker and producer.
   
Thanks,
Steven
   
  
 

   
   
   
--
-- Guozhang
   
  
 
 
 



Re: New Producer - ONLY sync mode?

2015-02-08 Thread Jay Kreps
Hey Otis,

Yeah, Gwen is correct. The future from the send will be satisfied when the
response is received so it will be exactly the same as the performance of
the sync producer previously.

-Jay

On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira gshap...@cloudera.com wrote:

 If I understood the code and Jay correctly - if you wait for the
 future it will be a similar delay to that of the old sync producer.

 Put another way, if you test it out and see longer delays than the
 sync producer had, we need to find out why and fix it.

 Gwen

 On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic
 otis.gospodne...@gmail.com wrote:
  Hi,
 
  Nope, unfortunately it can't do that.  X is a remote app, doesn't listen
 to
  anything external, calls Y via HTTPS.  So X has to decide what to do with
  its data based on Y's synchronous response.  It has to block until Y
  responds.  And it wouldn't be pretty, I think, because nobody wants to
 run
  apps that talk to remove servers and hang on to connections more than
 they
  have to.  But perhaps that is the only way?  Or maybe the answer to I'm
  guessing the delay would be more or less the same as if the Producer was
  using SYNC mode? is YES, in which case the connection from X to Y would
 be
  open for just as long as with a SYNC producer running in Y?
 
  Thanks,
  Otis
  --
  Monitoring * Alerting * Anomaly Detection * Centralized Log Management
  Solr  Elasticsearch Support * http://sematext.com/
 
 
  On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
  Can Y have a callback that will handle the notification to X?
  In this case, perhaps Y can be async and X can buffer the data until
  the callback triggers and says all good (or resend if the callback
  indicates an error)
 
  On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
  otis.gospodne...@gmail.com wrote:
   Hi,
  
   Thanks for the info.  Here's the use case.  We have something up
 stream
   sending data, say a log shipper called X.  It sends it to some remote
   component Y.  Y is the Kafka Producer and it puts data into Kafka.
 But Y
   needs to send a reply to X and tell it whether it successfully put all
  its
   data into Kafka.  If it did not, Y wants to tell X to buffer data
 locally
   and resend it later.
  
   If producer is ONLY async, Y can't easily do that.  Or maybe Y would
 just
   need to wait for the Future to come back and only then send the
 response
   back to X?  If so, I'm guessing the delay would be more or less the
 same
  as
   if the Producer was using SYNC mode?
  
   Thanks,
   Otis
   --
   Monitoring * Alerting * Anomaly Detection * Centralized Log Management
   Solr  Elasticsearch Support * http://sematext.com/
  
  
   On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps jay.kr...@gmail.com
 wrote:
  
   Yeah as Gwen says there is no sync/async mode anymore. There is a new
   configuration which does a lot of what async did in terms of allowing
   batching:
  
   batch.size - This is the target amount of data per partition the
 server
   will attempt to batch together.
   linger.ms - This is the time the producer will wait for more data
 to be
   sent to better batch up writes. The default is 0 (send immediately).
 So
  if
   you set this to 50 ms the client will send immediately if it has
 already
   filled up its batch, otherwise it will wait to accumulate the number
 of
   bytes given by batch.size.
  
   To send asynchronously you do
  producer.send(record)
   whereas to block on a response you do
  producer.send(record).get();
   which will wait for acknowledgement from the server.
  
   One advantage of this model is that the client will do it's best to
  batch
   under the covers even if linger.ms=0. It will do this by batching
 any
  data
   that arrives while another send is in progress into a single
   request--giving a kind of group commit effect.
  
   The hope is that this will be both simpler to understand (a single
 api
  that
   always works the same) and more powerful (you always get a response
 with
   error and offset information whether or not you choose to use it).
  
   -Jay
  
  
   On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira gshap...@cloudera.com
 
   wrote:
  
If you want to emulate the old sync producer behavior, you need to
 set
the batch size to 1  (in producer config) and wait on the future
 you
get from Send (i.e. future.get)
   
I can't think of good reasons to do so, though.
   
Gwen
   
   
On Mon, Feb 2, 2015 at 11:08 AM, Otis Gospodnetic
otis.gospodne...@gmail.com wrote:
 Hi,

 Is the plan for New Producer to have ONLY async mode?  I'm asking
   because
 of this info from the Wiki:


- The producer will always attempt to batch data and will
 always
immediately return a SendResponse which acts as a Future to
 allow
   the
client to await the completion of the request.


 The word always makes me think there will be no sync mode.

 

Console Producer Throwing LeaderNotAvailableException Despite Existing Leader for Partition

2015-02-08 Thread Alex Melville
Howdy all,

I recently upgraded to Kafka 0.8.2.0 and am trying to verify that
everything still works as expected. I spin up two brokers, one zk instance,
and then create a topic using

kafka-topics.sh --create --zookeeper ad-0104:2181 --topic deleteme
--partitions 2 --replication-factor 1

Then I run --describe to check if the partitions have leaders. I get


kafka-topics.sh --describe --zookeeper ad-0104:2181 --topic deleteme

Topic:deleteme PartitionCount:2 ReplicationFactor:1 Configs:
Topic: deleteme Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: deleteme Partition: 1 Leader: 1 Replicas: 1 Isr: 1


Finally, I run the console producer

kafka-console-producer.sh --broker-list ad-0102:9092 --topic deleteme

I get the following warning

[2015-02-08 00:36:24,244] WARN Property topic is not valid
(kafka.utils.VerifiableProperties)

and then it waits for console input. When I try to send a message I get the
following list of error messages

[2015-02-08 00:37:04,735] WARN Error while fetching metadata
[{TopicMetadata for topic deleteme -
No partition metadata for topic deleteme due to
kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
kafka.common.LeaderNotAvailableException
 (kafka.producer.BrokerPartitionInfo)
[2015-02-08 00:37:04,751] WARN Error while fetching metadata
[{TopicMetadata for topic deleteme -
No partition metadata for topic deleteme due to
kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
kafka.common.LeaderNotAvailableException
 (kafka.producer.BrokerPartitionInfo)
[2015-02-08 00:37:04,752] ERROR Failed to collate messages by topic,
partition due to: Failed to fetch topic metadata for topic: deleteme
(kafka.producer.async.DefaultEventHandler)
[2015-02-08 00:37:04,859] WARN Error while fetching metadata
[{TopicMetadata for topic deleteme -
No partition metadata for topic deleteme due to
kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
kafka.common.LeaderNotAvailableException
 (kafka.producer.BrokerPartitionInfo)
[2015-02-08 00:37:04,863] WARN Error while fetching metadata
[{TopicMetadata for topic deleteme -
No partition metadata for topic deleteme due to
kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
kafka.common.LeaderNotAvailableException
 (kafka.producer.BrokerPartitionInfo)
[2015-02-08 00:37:04,863] ERROR Failed to collate messages by topic,
partition due to: Failed to fetch topic metadata for topic: deleteme
(kafka.producer.async.DefaultEventHandler)
[2015-02-08 00:37:04,968] WARN Error while fetching metadata
[{TopicMetadata for topic deleteme -
No partition metadata for topic deleteme due to
kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
kafka.common.LeaderNotAvailableException
 (kafka.producer.BrokerPartitionInfo)
[2015-02-08 00:37:04,974] WARN Error while fetching metadata
[{TopicMetadata for topic deleteme -
No partition metadata for topic deleteme due to
kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
kafka.common.LeaderNotAvailableException
 (kafka.producer.BrokerPartitionInfo)
[2015-02-08 00:37:04,974] ERROR Failed to collate messages by topic,
partition due to: Failed to fetch topic metadata for topic: deleteme
(kafka.producer.async.DefaultEventHandler)
[2015-02-08 00:37:05,079] WARN Error while fetching metadata
[{TopicMetadata for topic deleteme -
No partition metadata for topic deleteme due to
kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
kafka.common.LeaderNotAvailableException
 (kafka.producer.BrokerPartitionInfo)
[2015-02-08 00:37:05,084] WARN Error while fetching metadata
[{TopicMetadata for topic deleteme -
No partition metadata for topic deleteme due to
kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
kafka.common.LeaderNotAvailableException
 (kafka.producer.BrokerPartitionInfo)
[2015-02-08 00:37:05,084] ERROR Failed to collate messages by topic,
partition due to: Failed to fetch topic metadata for topic: deleteme
(kafka.producer.async.DefaultEventHandler)
[2015-02-08 00:37:05,189] WARN Error while fetching metadata
[{TopicMetadata for topic deleteme -
No partition metadata for topic deleteme due to
kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
kafka.common.LeaderNotAvailableException
 (kafka.producer.BrokerPartitionInfo)
[2015-02-08 00:37:05,191] ERROR Failed to send requests for topics deleteme
with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler)
[2015-02-08 00:37:05,192] ERROR Error in handling batch of 1 events
(kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3
tries.
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
at

Re: New Producer - ONLY sync mode?

2015-02-08 Thread Jay Kreps
I implemented the flush() call I hypothesized earlier in this thread as a
patch on KAFKA-1865.

So now
  producer.flush()
will block until all buffered requests complete. The post condition is that
all previous send futures are satisfied and have error/offset information.
This is a little easier to use then keeping a list of all the futures and
also probably a bit more efficient. It also immediately unblocks all
requests regardless of the linger.ms setting, which is in keeping with the
name I think and important for the use case we described.

Code here:
https://issues.apache.org/jira/browse/KAFKA-1865

-Jay

On Sat, Feb 7, 2015 at 1:24 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Hey Otis,

 Yeah, Gwen is correct. The future from the send will be satisfied when the
 response is received so it will be exactly the same as the performance of
 the sync producer previously.

 -Jay

 On Mon, Feb 2, 2015 at 1:34 PM, Gwen Shapira gshap...@cloudera.com
 wrote:

 If I understood the code and Jay correctly - if you wait for the
 future it will be a similar delay to that of the old sync producer.

 Put another way, if you test it out and see longer delays than the
 sync producer had, we need to find out why and fix it.

 Gwen

 On Mon, Feb 2, 2015 at 1:27 PM, Otis Gospodnetic
 otis.gospodne...@gmail.com wrote:
  Hi,
 
  Nope, unfortunately it can't do that.  X is a remote app, doesn't
 listen to
  anything external, calls Y via HTTPS.  So X has to decide what to do
 with
  its data based on Y's synchronous response.  It has to block until Y
  responds.  And it wouldn't be pretty, I think, because nobody wants to
 run
  apps that talk to remove servers and hang on to connections more than
 they
  have to.  But perhaps that is the only way?  Or maybe the answer to I'm
  guessing the delay would be more or less the same as if the Producer was
  using SYNC mode? is YES, in which case the connection from X to Y
 would be
  open for just as long as with a SYNC producer running in Y?
 
  Thanks,
  Otis
  --
  Monitoring * Alerting * Anomaly Detection * Centralized Log Management
  Solr  Elasticsearch Support * http://sematext.com/
 
 
  On Mon, Feb 2, 2015 at 4:03 PM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
  Can Y have a callback that will handle the notification to X?
  In this case, perhaps Y can be async and X can buffer the data until
  the callback triggers and says all good (or resend if the callback
  indicates an error)
 
  On Mon, Feb 2, 2015 at 12:56 PM, Otis Gospodnetic
  otis.gospodne...@gmail.com wrote:
   Hi,
  
   Thanks for the info.  Here's the use case.  We have something up
 stream
   sending data, say a log shipper called X.  It sends it to some remote
   component Y.  Y is the Kafka Producer and it puts data into Kafka.
 But Y
   needs to send a reply to X and tell it whether it successfully put
 all
  its
   data into Kafka.  If it did not, Y wants to tell X to buffer data
 locally
   and resend it later.
  
   If producer is ONLY async, Y can't easily do that.  Or maybe Y would
 just
   need to wait for the Future to come back and only then send the
 response
   back to X?  If so, I'm guessing the delay would be more or less the
 same
  as
   if the Producer was using SYNC mode?
  
   Thanks,
   Otis
   --
   Monitoring * Alerting * Anomaly Detection * Centralized Log
 Management
   Solr  Elasticsearch Support * http://sematext.com/
  
  
   On Mon, Feb 2, 2015 at 3:13 PM, Jay Kreps jay.kr...@gmail.com
 wrote:
  
   Yeah as Gwen says there is no sync/async mode anymore. There is a
 new
   configuration which does a lot of what async did in terms of
 allowing
   batching:
  
   batch.size - This is the target amount of data per partition the
 server
   will attempt to batch together.
   linger.ms - This is the time the producer will wait for more data
 to be
   sent to better batch up writes. The default is 0 (send
 immediately). So
  if
   you set this to 50 ms the client will send immediately if it has
 already
   filled up its batch, otherwise it will wait to accumulate the
 number of
   bytes given by batch.size.
  
   To send asynchronously you do
  producer.send(record)
   whereas to block on a response you do
  producer.send(record).get();
   which will wait for acknowledgement from the server.
  
   One advantage of this model is that the client will do it's best to
  batch
   under the covers even if linger.ms=0. It will do this by batching
 any
  data
   that arrives while another send is in progress into a single
   request--giving a kind of group commit effect.
  
   The hope is that this will be both simpler to understand (a single
 api
  that
   always works the same) and more powerful (you always get a response
 with
   error and offset information whether or not you choose to use it).
  
   -Jay
  
  
   On Mon, Feb 2, 2015 at 11:15 AM, Gwen Shapira 
 gshap...@cloudera.com
   wrote:
  
If you want to emulate the old sync producer behavior, you need
 to set
the batch 

Re: Not found NewShinyProducer sync performance metrics

2015-02-08 Thread Manikumar Reddy
You can configure the jmx ports by using below system properties.

 -Dcom.sun.management.jmxremote.port=
 -Dcom.sun.management.jmxremote.rmi.port=8889

On Fri, Feb 6, 2015 at 9:19 AM, Xinyi Su xiny...@gmail.com wrote:

 Hi,

 I try to use  Jconsole to connect  remote Kafka broker which is running
 behind a firewall. But it is blocked by the firewall.

 I can specify JMX registry port by set JMX_PORT= which is allowed by
 firewall, but I cannot specify the ephemeral port which is always chosen
 randomly at startup. This ephemeral port is which JMX RMI server listens on
 and through which actual data exchange takes place. It is randomly assigned
 and I have no way to specify it as some port which firewall does not block.

 How to solve this issue since I cannot access Jconsole because of firewall?

 Thanks.
 Xinyi

 On 6 February 2015 at 07:24, Otis Gospodnetic otis.gospodne...@gmail.com
 wrote:

  Not announced yet, but http://sematext.com/spm should be showing you all
  the new shiny Kafka (new producer) metrics out of the box.  If you don't
  see them, please shout (I know we have a bit more tweaking to do in the
  coming day-two-three).
 
  If you want to just dump MBeans from JMX manually and eyeball the output,
  you could use something like https://github.com/sematext/jmxc to dump
 the
  whole JMX content of your Java Consumer, Producer, or Broker.
 
  Otis
  --
  Monitoring * Alerting * Anomaly Detection * Centralized Log Management
  Solr  Elasticsearch Support * http://sematext.com/
 
 
  On Thu, Feb 5, 2015 at 5:58 AM, Manikumar Reddy ku...@nmsworks.co.in
  wrote:
 
   New Producer uses Kafka's own metrics api. Currently metrics are
 reported
   using jmx. Any jmx monitoring tool (jconsole) can be used for
 monitoring.
   On Feb 5, 2015 3:56 PM, Xinyi Su xiny...@gmail.com wrote:
  
Hi,
I am using kafka-producer-perf-test.sh to study NewShinyProducer
 *sync*
performance.
   
I have not found any CSV output or metrics collector for
  NewShinyProducer
sync performance.
   
Would you like to share with me about how to collect NewShinyProducer
metrics?
   
Thanks.
   
Best regards.
Xinyi
   
  
 



one message consumed by both consumers in the same group?

2015-02-08 Thread Yang
my understanding is that for 2 consumers within the same group, a message
goes to only one of the consumers, not both.

but I did a simple test: get the kafka distro, copy the code dir to
/tmp/kafka/

then run console consumer from both dirs at the same time

bin/kafka-console-consumer.sh --.

all the parameters are the same

then I use console producer to dump some messages into the queue. the
message does show up on both ends.


where am I doing wrong?

thanks
Yang


Re: one message consumed by both consumers in the same group?

2015-02-08 Thread Manikumar Reddy
Hi,

bin/kafka-console-consumer.sh --.

 all the parameters are the same


 You need to set same group.id to create a consumer group. By default
console consumer creates a random group.id.
 You can set group.id by using  --consumer.config /tmp/comsumer.props
flag.

  $$echo group.id=1  /tmp/consumer.props


High Latency in Kafka

2015-02-08 Thread Vineet Mishra
Hi All,

I am having some log files of around 30GB, I am trying to event process
these logs by pushing them to Kafka. I could clearly see the throughput
achieved while publishing these event to Kafka is quiet slow.

So as mentioned for the single log file of 30GB, the Logstash is
continuously emitting to Kafka and it is running from more than 2 days but
still it has processed just 60% of the log data. I was looking out for a
way to increase the efficiency of the publishing the event to kafka as with
this rate of data ingestion I don't think it will be a good option to move
ahead.

Looking out for performance improvisation for the same.

Experts advise required!

Thanks!


Re: Console Producer Throwing LeaderNotAvailableException Despite Existing Leader for Partition

2015-02-08 Thread tao xiao
Alex,

I got similar error before due to incorrect network binding of my laptop's
wireless interface. You can try with setting advertised.host.name=kafka's
server hostname in the server.properties and run it again.

On Sun, Feb 8, 2015 at 8:38 AM, Alex Melville amelvi...@g.hmc.edu wrote:

 Howdy all,

 I recently upgraded to Kafka 0.8.2.0 and am trying to verify that
 everything still works as expected. I spin up two brokers, one zk instance,
 and then create a topic using

 kafka-topics.sh --create --zookeeper ad-0104:2181 --topic deleteme
 --partitions 2 --replication-factor 1

 Then I run --describe to check if the partitions have leaders. I get


 kafka-topics.sh --describe --zookeeper ad-0104:2181 --topic deleteme

 Topic:deleteme PartitionCount:2 ReplicationFactor:1 Configs:
 Topic: deleteme Partition: 0 Leader: 0 Replicas: 0 Isr: 0
 Topic: deleteme Partition: 1 Leader: 1 Replicas: 1 Isr: 1


 Finally, I run the console producer

 kafka-console-producer.sh --broker-list ad-0102:9092 --topic deleteme

 I get the following warning

 [2015-02-08 00:36:24,244] WARN Property topic is not valid
 (kafka.utils.VerifiableProperties)

 and then it waits for console input. When I try to send a message I get the
 following list of error messages

 [2015-02-08 00:37:04,735] WARN Error while fetching metadata
 [{TopicMetadata for topic deleteme -
 No partition metadata for topic deleteme due to
 kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
 [2015-02-08 00:37:04,751] WARN Error while fetching metadata
 [{TopicMetadata for topic deleteme -
 No partition metadata for topic deleteme due to
 kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
 [2015-02-08 00:37:04,752] ERROR Failed to collate messages by topic,
 partition due to: Failed to fetch topic metadata for topic: deleteme
 (kafka.producer.async.DefaultEventHandler)
 [2015-02-08 00:37:04,859] WARN Error while fetching metadata
 [{TopicMetadata for topic deleteme -
 No partition metadata for topic deleteme due to
 kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
 [2015-02-08 00:37:04,863] WARN Error while fetching metadata
 [{TopicMetadata for topic deleteme -
 No partition metadata for topic deleteme due to
 kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
 [2015-02-08 00:37:04,863] ERROR Failed to collate messages by topic,
 partition due to: Failed to fetch topic metadata for topic: deleteme
 (kafka.producer.async.DefaultEventHandler)
 [2015-02-08 00:37:04,968] WARN Error while fetching metadata
 [{TopicMetadata for topic deleteme -
 No partition metadata for topic deleteme due to
 kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
 [2015-02-08 00:37:04,974] WARN Error while fetching metadata
 [{TopicMetadata for topic deleteme -
 No partition metadata for topic deleteme due to
 kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
 [2015-02-08 00:37:04,974] ERROR Failed to collate messages by topic,
 partition due to: Failed to fetch topic metadata for topic: deleteme
 (kafka.producer.async.DefaultEventHandler)
 [2015-02-08 00:37:05,079] WARN Error while fetching metadata
 [{TopicMetadata for topic deleteme -
 No partition metadata for topic deleteme due to
 kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
 [2015-02-08 00:37:05,084] WARN Error while fetching metadata
 [{TopicMetadata for topic deleteme -
 No partition metadata for topic deleteme due to
 kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
 [2015-02-08 00:37:05,084] ERROR Failed to collate messages by topic,
 partition due to: Failed to fetch topic metadata for topic: deleteme
 (kafka.producer.async.DefaultEventHandler)
 [2015-02-08 00:37:05,189] WARN Error while fetching metadata
 [{TopicMetadata for topic deleteme -
 No partition metadata for topic deleteme due to
 kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
 kafka.common.LeaderNotAvailableException
  (kafka.producer.BrokerPartitionInfo)
 [2015-02-08 00:37:05,191] ERROR Failed to send requests for topics deleteme
 with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler)
 [2015-02-08 00:37:05,192] ERROR Error in handling batch of 1 events
 (kafka.producer.async.ProducerSendThread)
 

Re: New Producer - ONLY sync mode?

2015-02-08 Thread Jay Kreps
Steve

In terms of mimicing the sync behavior, I think that is what .get() does,
no?

We are always returning the offset and error information. The example I
gave didn't make use of it, but you definitely can make use of it if you
want to.

-Jay

On Wed, Feb 4, 2015 at 9:58 AM, Steve Morin st...@stevemorin.com wrote:

 Looking at this thread I would ideally want something at least the right
 recipe to mimic sync behavior like Otis is talking about.

 In the second case, would like to be able to individually know if messages
 have failed even regardless if they are in separate batches, sort of like
 what Kinesis does as Pradeep mentioned.
 -Steve

 On Wed, Feb 4, 2015 at 11:19 AM, Jay Kreps jay.kr...@gmail.com wrote:

  Yeah totally. Using a callback is, of course, the Right Thing for this
 kind
  of stuff. But I have found that kind of asynchronous thinking can be hard
  for people. Even if you get out of the pre-java 8 syntactic pain that
  anonymous inner classes inflict just dealing with multiple threads of
  control without creating async spaghetti can be a challenge for complex
  stuff. That is really the only reason for the futures in the api, they
 are
  strictly less powerful than the callbacks, but at least using them you
 can
  just call .get() and pretend it is blocking.
 
  -Jay
 
  On Wed, Feb 4, 2015 at 7:19 AM, Joe Stein joe.st...@stealth.ly wrote:
 
   Now that 0.8.2.0 is in the wild I look forward to working with more and
   seeing what folks start to-do with this function
  
  
 
 https://dist.apache.org/repos/dist/release/kafka/0.8.2.0/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#send(org.apache.kafka.clients.producer.ProducerRecord
   ,
   org.apache.kafka.clients.producer.Callback) and keeping it fully non
   blocking.
  
   One sprint I know of coming up is going to have the new producer as a
   component in their reactive calls and handling bookkeeping and retries
   through that type of call back approach. Should work well (haven't
 tried
   but don't see why not) with Akka, ScalaZ, RxJava, Finagle, etc, etc,
 etc
  in
   functional languages and frameworks.
  
   I think as JDK 8 starts to get out in the wild too more (may after jdk7
   eol) the use of .get will be reduced (imho) and folks will be thinking
  more
   about non-blocking vs blocking and not as so much sync vs async but my
   crystal ball just back from the shop so well see =8^)
  
   /***
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
   /
  
   On Tue, Feb 3, 2015 at 10:45 PM, Jay Kreps jay.kr...@gmail.com
 wrote:
  
Hey guys,
   
I guess the question is whether it really matters how many underlying
network requests occur? It is very hard for an application to depend
 on
this even in the old producer since it depends on the partitions
   placement
(a send to two partitions may go to either one machine or two and so
 it
will send either one or two requests). So when you send a batch in
 one
   call
you may feel that is all at once, but that is only actually
  guaranteed
   if
all messages have the same partition.
   
The challenge is allowing even this in the presence of bounded
 request
sizes which we have in the new producer. The user sends a list of
  objects
and the serialized size that will result is not very apparent to
 them.
  If
you break it up into multiple requests then that is kind of further
   ruining
the illusion of a single send. If you don't then you have to just
 error
   out
which is equally annoying to have to handle.
   
But I'm not sure if from your description you are saying you actually
   care
how many physical requests are issued. I think it is more like it is
  just
syntactically annoying to send a batch of data now because it needs a
  for
loop.
   
Currently to do this you would do:
   
List responses = new ArrayList();
for(input: recordBatch)
responses.add(producer.send(input));
for(response: responses)
response.get
   
If you don't depend on the offset/error info we could add a flush
 call
  so
you could instead do
for(input: recordBatch)
producer.send(input);
producer.flush();
   
But if you do want the error/offset then you are going to be back to
  the
original case.
   
Thoughts?
   
-Jay
   
On Mon, Feb 2, 2015 at 1:48 PM, Gwen Shapira gshap...@cloudera.com
wrote:
   
 I've been thinking about that too, since both Flume and Sqoop rely
 on
 send(List) API of the old API.

 I'd like to see this API come back, but I'm debating how we'd
 handle
 errors. IIRC, the old API would fail an entire batch on a single
 error, which can lead to duplicates. Having N callbacks lets me
 retry

Re: How to delete defunct topics

2015-02-08 Thread Jagbir Hooda
Hi Joel,

The mbean approach depends upon storing traffic stats over a period of
time, so is doable but complex. I am thinking about another approach
to check logfiles time-stamp (to check producer activity) in
conjunction with the mtime for topic offsets (consumers activity). So
if both timestamp and mtime are older than 'n' days that may imply a
defunct topic. Will appreciate your feedback.

Thanks,
Jagbir

On Thu, Feb 5, 2015 at 6:57 PM, Joel Koshy jjkosh...@gmail.com wrote:
 There are mbeans
 (http://kafka.apache.org/documentation.html#monitoring) that you can
 poke for incoming message rate - if you look at those over a period of
 time you can figure out which of those are likely to be defunct and
 then delete those topics.

 On Thu, Feb 05, 2015 at 02:38:27PM -0800, Jagbir Hooda wrote:
 First I would like to take this opportunity to thank this group for
 releasing 0.8.2.0. It's a major milestone with a rich set of features.
 Kudos to all the contributors! We are still running 0.8.1.2 and are
 planning to upgrade to 0.8.2.0. While planning this upgrade we
 discovered many topics that are no longer active and best be deleted.
 Since it would be a common task faced by many kafka adopters I thought
 I'd raise it here and seek expert advise. Basically what we desire is
 a utility/script/steps to first identify the defunct topics (let's say
 those topics that haven't seen any traffic in the past 'n' days) and
 then delete them. Will appreciate your response.

 Thanks,
 Jagbir



[DISCUSS] KIP-8 Add a flush method to the new Java producer

2015-02-08 Thread Jay Kreps
Following up on our previous thread on making batch send a little easier,
here is a concrete proposal to add a flush() method to the producer:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API

A proposed implementation is here:
https://issues.apache.org/jira/browse/KAFKA-1865

Thoughts?

-Jay


Re: Doubts Kafka

2015-02-08 Thread Christopher Piggott
Have there been any changes with 0.8.2 in how the marker gets moved when
you use the high-level consumer?

One problem I have always had was: what if I pull something from the
stream, but then I have an error in processing it?  I don't really want to
move the marker.

I would almost like the client to have a callback mechanism for processing,
and the marker only gets moved if the high level consumer successfully
implements my callback/processor (with no exceptions, at least).



On Sun, Feb 8, 2015 at 9:49 AM, Gwen Shapira gshap...@cloudera.com wrote:

 On Sun, Feb 8, 2015 at 6:39 AM, Christopher Piggott cpigg...@gmail.com
 wrote:

   The consumer used Zookeeper to store offsets, in 0.8.2 there's an
 option
  to use Kafka itself for that (by setting *offsets.storage = kafka
 
  Does it still really live in zookeeper, and kafka is proxying the
 requests
  through?
 
 
 They don't live in Zookeeper. They live in a secret Kafka topic (__offsets
 or something like that).

 For migration purposes, you can set dual.commit.enable = true and then
 offsets will be stored in both Kafka and ZK, but the intention is to
 migrate to 100% Kafka storage.



  On Sun, Feb 8, 2015 at 9:25 AM, Gwen Shapira gshap...@cloudera.com
  wrote:
 
   Hi Eduardo,
  
   1. Why sometimes the applications prefer to connect to zookeeper
 instead
   brokers?
  
   I assume you are talking about the clients and some of our tools?
   These are parts of an older design and we are actively working on
 fixing
   this. The consumer used Zookeeper to store offsets, in 0.8.2 there's an
   option to use Kafka itself for that (by setting *offsets.storage =
  kafka*).
   We are planning on fixing the tools in 0.9, but obviously they are less
   performance sensitive than the consumers.
  
   2. Regarding your tests and disk usage - I'm not sure exactly what
 fills
   your disk - if its the kafka transaction logs (i.e. log.dir), then we
   expect to store the size of all messages sent times the replication
  faction
   configured for each topic. We keep messages for the amount of time
   specified in *log.retention* parameters. If the disk is filled within
   minutes, either set log.retention.minutes very low (at risk of losing
  data
   if consumers need restart), or make sure your disk capacity matches the
   rates in which producers send data.
  
   Gwen
  
  
   On Sat, Feb 7, 2015 at 3:01 AM, Eduardo Costa Alfaia 
   e.costaalf...@unibs.it
wrote:
  
Hi Guys,
   
I have some doubts about the Kafka, the first is Why sometimes the
applications prefer to connect to zookeeper instead brokers?
 Connecting
   to
zookeeper could create an overhead, because we are inserting other
   element
between producer and consumer. Another question is about the
  information
sent by producer, in my tests the producer send the messages to
 brokers
   and
a few minutes my HardDisk is full (my harddisk has 250GB), is there
something to do in the configuration to minimize this?
   
Thanks
--
Informativa sulla Privacy: http://www.unibs.it/node/8155
   
  
 



Fetch size

2015-02-08 Thread CJ Woolard

Hi Kafka team,

We have a use case where we need to consume from ~20 topics (each with 24 
partitions), we have a potential max message size of 20MB so we've set our 
consumer fetch.size to 20MB but that's causing very poor performance on our 
consumer (most of our messages are in the 10-100k range). Is it possible to set 
the fetch size to a lower number than the max message size and gracefully 
handle larger messages (as a trapped exception for example) in order to improve 
our throughput?

Thank you in advance for your help
CJ Woolard

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

2015-02-08 Thread Gwen Shapira
Looks good to me.

I like the idea of not blocking additional sends but not guaranteeing that
flush() will deliver them.

I assume that with linger.ms = 0, flush will just be a noop (since the
queue will be empty). Is that correct?

Gwen

On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps jay.kr...@gmail.com wrote:

 Following up on our previous thread on making batch send a little easier,
 here is a concrete proposal to add a flush() method to the producer:


 https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API

 A proposed implementation is here:
 https://issues.apache.org/jira/browse/KAFKA-1865

 Thoughts?

 -Jay



Poor performance consuming multiple topics

2015-02-08 Thread Cj


Hi Kafka team,

We have a use case where we need to consume from ~20 topics (each with 24 
partitions), we have a potential max message size of 20MB so we've set our 
consumer fetch.size to 20MB but that's causing very poor performance on our 
consumer (most of our messages are in the 10-100k range). Is it possible to set 
the fetch size to a lower number than the max message size and gracefully 
handle larger messages (as a trapped exception for example) in order to improve 
our throughput?

Thank you in advance for your help
CJ Woolard

Apache Kafka 0.8.2 Consumer Example

2015-02-08 Thread Ricardo Ferreira
Hi,

I had a client running on Kafka 0.8.1 using the High-Level consumer API
working fine.

Today I updated my Kafka installation for the 0.8.2 version (tried all
versions of Scala) but the consumer doesn't get any messages. I tested
using the kafka-console-consumer.sh utility tool and works fine, only my
Java program that not.

Did I miss something? I heard that the API changed, so I'd like to know if
someone can share a simple client with me.

Please, respond directly to me or just reply all because I am not currently
subscribed to the group.

Thanks,

Ricardo Ferreira


Re: Apache Kafka 0.8.2 Consumer Example

2015-02-08 Thread Gwen Shapira
I have a 0.8.1 high level consumer working fine with 0.8.2 server. Few of
them actually :)
AFAIK the API did not change.

Do you see any error messages? Do you have timeout configured on the
consumer? What does the offset checker tool say?

On Fri, Feb 6, 2015 at 4:49 PM, Ricardo Ferreira jricardoferre...@gmail.com
 wrote:

 Hi,

 I had a client running on Kafka 0.8.1 using the High-Level consumer API
 working fine.

 Today I updated my Kafka installation for the 0.8.2 version (tried all
 versions of Scala) but the consumer doesn't get any messages. I tested
 using the kafka-console-consumer.sh utility tool and works fine, only my
 Java program that not.

 Did I miss something? I heard that the API changed, so I'd like to know if
 someone can share a simple client with me.

 Please, respond directly to me or just reply all because I am not currently
 subscribed to the group.

 Thanks,

 Ricardo Ferreira



Re: High Latency in Kafka

2015-02-08 Thread Gwen Shapira
I'm wondering how much of the time is spent by Logstash reading and
processing the log vs. time spent sending data to Kafka. Also, I'm not
familiar with log.stash internals, perhaps it can be tuned to send the data
to Kafka in larger batches?

At the moment its difficult to tell where is the slowdown. More information
about the breakdown of time will help.

You can try Flume's SpoolingDirectory source with Kafka Channel or Sink and
see if you get improved performance out of other tools.


Gwen

On Sun, Feb 8, 2015 at 12:06 AM, Vineet Mishra clearmido...@gmail.com
wrote:

 Hi All,

 I am having some log files of around 30GB, I am trying to event process
 these logs by pushing them to Kafka. I could clearly see the throughput
 achieved while publishing these event to Kafka is quiet slow.

 So as mentioned for the single log file of 30GB, the Logstash is
 continuously emitting to Kafka and it is running from more than 2 days but
 still it has processed just 60% of the log data. I was looking out for a
 way to increase the efficiency of the publishing the event to kafka as with
 this rate of data ingestion I don't think it will be a good option to move
 ahead.

 Looking out for performance improvisation for the same.

 Experts advise required!

 Thanks!



Re: Doubts Kafka

2015-02-08 Thread Christopher Piggott
Sorry, I should have read the release notes before I asked this question.
The answer was in there.

Internally the implementation of the offset storage is just a compacted
 http://kafka.apache.org/documentation.html#compaction Kafka topic (
 __consumer_offsets) keyed on the consumer’s group, topic, and partition.
 The offset commit request writes the offset to the compacted Kafka topic
 using the highest level of durability guarantee that Kafka provides (
 acks=-1) so that offsets are never lost in the presence of uncorrelated
 failures. Kafka maintains an in-memory view of the latest offset per
 consumer group, topic, partition triplet, so offset fetch requests can be
 served quickly without requiring a full scan of the compacted offsets
 topic. With this feature, consumers can checkpoint offsets very often,
 possibly per message.



On Sun, Feb 8, 2015 at 9:39 AM, Christopher Piggott cpigg...@gmail.com
wrote:

  The consumer used Zookeeper to store offsets, in 0.8.2 there's an option
 to use Kafka itself for that (by setting *offsets.storage = kafka

 Does it still really live in zookeeper, and kafka is proxying the requests
 through?

 On Sun, Feb 8, 2015 at 9:25 AM, Gwen Shapira gshap...@cloudera.com
 wrote:

 Hi Eduardo,

 1. Why sometimes the applications prefer to connect to zookeeper instead
 brokers?

 I assume you are talking about the clients and some of our tools?
 These are parts of an older design and we are actively working on fixing
 this. The consumer used Zookeeper to store offsets, in 0.8.2 there's an
 option to use Kafka itself for that (by setting *offsets.storage =
 kafka*).
 We are planning on fixing the tools in 0.9, but obviously they are less
 performance sensitive than the consumers.

 2. Regarding your tests and disk usage - I'm not sure exactly what fills
 your disk - if its the kafka transaction logs (i.e. log.dir), then we
 expect to store the size of all messages sent times the replication
 faction
 configured for each topic. We keep messages for the amount of time
 specified in *log.retention* parameters. If the disk is filled within
 minutes, either set log.retention.minutes very low (at risk of losing data
 if consumers need restart), or make sure your disk capacity matches the
 rates in which producers send data.

 Gwen


 On Sat, Feb 7, 2015 at 3:01 AM, Eduardo Costa Alfaia 
 e.costaalf...@unibs.it
  wrote:

  Hi Guys,
 
  I have some doubts about the Kafka, the first is Why sometimes the
  applications prefer to connect to zookeeper instead brokers? Connecting
 to
  zookeeper could create an overhead, because we are inserting other
 element
  between producer and consumer. Another question is about the information
  sent by producer, in my tests the producer send the messages to brokers
 and
  a few minutes my HardDisk is full (my harddisk has 250GB), is there
  something to do in the configuration to minimize this?
 
  Thanks
  --
  Informativa sulla Privacy: http://www.unibs.it/node/8155
 





Re: Doubts Kafka

2015-02-08 Thread Christopher Piggott
 The consumer used Zookeeper to store offsets, in 0.8.2 there's an option
to use Kafka itself for that (by setting *offsets.storage = kafka

Does it still really live in zookeeper, and kafka is proxying the requests
through?

On Sun, Feb 8, 2015 at 9:25 AM, Gwen Shapira gshap...@cloudera.com wrote:

 Hi Eduardo,

 1. Why sometimes the applications prefer to connect to zookeeper instead
 brokers?

 I assume you are talking about the clients and some of our tools?
 These are parts of an older design and we are actively working on fixing
 this. The consumer used Zookeeper to store offsets, in 0.8.2 there's an
 option to use Kafka itself for that (by setting *offsets.storage = kafka*).
 We are planning on fixing the tools in 0.9, but obviously they are less
 performance sensitive than the consumers.

 2. Regarding your tests and disk usage - I'm not sure exactly what fills
 your disk - if its the kafka transaction logs (i.e. log.dir), then we
 expect to store the size of all messages sent times the replication faction
 configured for each topic. We keep messages for the amount of time
 specified in *log.retention* parameters. If the disk is filled within
 minutes, either set log.retention.minutes very low (at risk of losing data
 if consumers need restart), or make sure your disk capacity matches the
 rates in which producers send data.

 Gwen


 On Sat, Feb 7, 2015 at 3:01 AM, Eduardo Costa Alfaia 
 e.costaalf...@unibs.it
  wrote:

  Hi Guys,
 
  I have some doubts about the Kafka, the first is Why sometimes the
  applications prefer to connect to zookeeper instead brokers? Connecting
 to
  zookeeper could create an overhead, because we are inserting other
 element
  between producer and consumer. Another question is about the information
  sent by producer, in my tests the producer send the messages to brokers
 and
  a few minutes my HardDisk is full (my harddisk has 250GB), is there
  something to do in the configuration to minimize this?
 
  Thanks
  --
  Informativa sulla Privacy: http://www.unibs.it/node/8155
 



Re: Doubts Kafka

2015-02-08 Thread Gwen Shapira
On Sun, Feb 8, 2015 at 6:39 AM, Christopher Piggott cpigg...@gmail.com
wrote:

  The consumer used Zookeeper to store offsets, in 0.8.2 there's an option
 to use Kafka itself for that (by setting *offsets.storage = kafka

 Does it still really live in zookeeper, and kafka is proxying the requests
 through?


They don't live in Zookeeper. They live in a secret Kafka topic (__offsets
or something like that).

For migration purposes, you can set dual.commit.enable = true and then
offsets will be stored in both Kafka and ZK, but the intention is to
migrate to 100% Kafka storage.



 On Sun, Feb 8, 2015 at 9:25 AM, Gwen Shapira gshap...@cloudera.com
 wrote:

  Hi Eduardo,
 
  1. Why sometimes the applications prefer to connect to zookeeper instead
  brokers?
 
  I assume you are talking about the clients and some of our tools?
  These are parts of an older design and we are actively working on fixing
  this. The consumer used Zookeeper to store offsets, in 0.8.2 there's an
  option to use Kafka itself for that (by setting *offsets.storage =
 kafka*).
  We are planning on fixing the tools in 0.9, but obviously they are less
  performance sensitive than the consumers.
 
  2. Regarding your tests and disk usage - I'm not sure exactly what fills
  your disk - if its the kafka transaction logs (i.e. log.dir), then we
  expect to store the size of all messages sent times the replication
 faction
  configured for each topic. We keep messages for the amount of time
  specified in *log.retention* parameters. If the disk is filled within
  minutes, either set log.retention.minutes very low (at risk of losing
 data
  if consumers need restart), or make sure your disk capacity matches the
  rates in which producers send data.
 
  Gwen
 
 
  On Sat, Feb 7, 2015 at 3:01 AM, Eduardo Costa Alfaia 
  e.costaalf...@unibs.it
   wrote:
 
   Hi Guys,
  
   I have some doubts about the Kafka, the first is Why sometimes the
   applications prefer to connect to zookeeper instead brokers? Connecting
  to
   zookeeper could create an overhead, because we are inserting other
  element
   between producer and consumer. Another question is about the
 information
   sent by producer, in my tests the producer send the messages to brokers
  and
   a few minutes my HardDisk is full (my harddisk has 250GB), is there
   something to do in the configuration to minimize this?
  
   Thanks
   --
   Informativa sulla Privacy: http://www.unibs.it/node/8155
  
 



Re: Doubts Kafka

2015-02-08 Thread Gwen Shapira
Hi Eduardo,

1. Why sometimes the applications prefer to connect to zookeeper instead
brokers?

I assume you are talking about the clients and some of our tools?
These are parts of an older design and we are actively working on fixing
this. The consumer used Zookeeper to store offsets, in 0.8.2 there's an
option to use Kafka itself for that (by setting *offsets.storage = kafka*).
We are planning on fixing the tools in 0.9, but obviously they are less
performance sensitive than the consumers.

2. Regarding your tests and disk usage - I'm not sure exactly what fills
your disk - if its the kafka transaction logs (i.e. log.dir), then we
expect to store the size of all messages sent times the replication faction
configured for each topic. We keep messages for the amount of time
specified in *log.retention* parameters. If the disk is filled within
minutes, either set log.retention.minutes very low (at risk of losing data
if consumers need restart), or make sure your disk capacity matches the
rates in which producers send data.

Gwen


On Sat, Feb 7, 2015 at 3:01 AM, Eduardo Costa Alfaia e.costaalf...@unibs.it
 wrote:

 Hi Guys,

 I have some doubts about the Kafka, the first is Why sometimes the
 applications prefer to connect to zookeeper instead brokers? Connecting to
 zookeeper could create an overhead, because we are inserting other element
 between producer and consumer. Another question is about the information
 sent by producer, in my tests the producer send the messages to brokers and
 a few minutes my HardDisk is full (my harddisk has 250GB), is there
 something to do in the configuration to minimize this?

 Thanks
 --
 Informativa sulla Privacy: http://www.unibs.it/node/8155



Re: Apache Kafka 0.8.2 Consumer Example

2015-02-08 Thread Ricardo Ferreira
Hi Gwen,

Sorry, both the consumer and the broker are 0.8.2?
A = Yes

So what's on 0.8.1?
A = It works fine using 0.8.1 for server AND client.

You probably know the consumer group of your application. Can you use the
offset checker tool on that?
A = Yes, I know from the consumer, and the offset checker gave me nothing
about that group.

Thanks,

Ricardo

On Sun, Feb 8, 2015 at 1:19 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Sorry, both the consumer and the broker are 0.8.2?

 So what's on 0.8.1?

 I seriously doubt downgrading is the solution.

 You probably know the consumer group of your application. Can you use the
 offset checker tool on that?

 Gwen

 On Sun, Feb 8, 2015 at 9:01 AM, Ricardo Ferreira 
 jricardoferre...@gmail.com wrote:

 Hi Gwen,

 Thanks for the response.

 In my case, I have both consumer application and the server versions in
 0.8.2, Scala 2.10.

 No errors are thrown, and my *zookeeper.session.timeout.ms
 http://zookeeper.session.timeout.ms* property is set to 500, although
 I tried 5000 and also didn't worked.

 I checked the offset checker tool, but it asks for a group in which I
 don't know which group the kafka-console-producer is using. I tried the
 consumer group but it gave the following message:

 Exiting due to: org.apache.zookeeper.KeeperException$NoNodeException:
 KeeperErrorCode = NoNode for /consumers/test-consumer-group/offsets/test/0.

 Perhaps the solution is downgrade the consumer libs to 0.8.1?

 Thanks,

 Ricardo

 On Sun, Feb 8, 2015 at 11:27 AM, Gwen Shapira gshap...@cloudera.com
 wrote:

 I have a 0.8.1 high level consumer working fine with 0.8.2 server. Few
 of them actually :)
 AFAIK the API did not change.

 Do you see any error messages? Do you have timeout configured on the
 consumer? What does the offset checker tool say?

 On Fri, Feb 6, 2015 at 4:49 PM, Ricardo Ferreira 
 jricardoferre...@gmail.com wrote:

 Hi,

 I had a client running on Kafka 0.8.1 using the High-Level consumer API
 working fine.

 Today I updated my Kafka installation for the 0.8.2 version (tried all
 versions of Scala) but the consumer doesn't get any messages. I tested
 using the kafka-console-consumer.sh utility tool and works fine, only my
 Java program that not.

 Did I miss something? I heard that the API changed, so I'd like to know
 if
 someone can share a simple client with me.

 Please, respond directly to me or just reply all because I am not
 currently
 subscribed to the group.

 Thanks,

 Ricardo Ferreira







Re: Console Producer Throwing LeaderNotAvailableException Despite Existing Leader for Partition

2015-02-08 Thread Alex Melville
Thanks Tao,

That fixed the problem. Console producer now correctly pushes to the topic
and console consumer can read the topic data.

-Alex

On Sun, Feb 8, 2015 at 1:47 AM, tao xiao xiaotao...@gmail.com wrote:

 Alex,

 I got similar error before due to incorrect network binding of my laptop's
 wireless interface. You can try with setting advertised.host.name=kafka's
 server hostname in the server.properties and run it again.

 On Sun, Feb 8, 2015 at 8:38 AM, Alex Melville amelvi...@g.hmc.edu wrote:

  Howdy all,
 
  I recently upgraded to Kafka 0.8.2.0 and am trying to verify that
  everything still works as expected. I spin up two brokers, one zk
 instance,
  and then create a topic using
 
  kafka-topics.sh --create --zookeeper ad-0104:2181 --topic deleteme
  --partitions 2 --replication-factor 1
 
  Then I run --describe to check if the partitions have leaders. I get
 
 
  kafka-topics.sh --describe --zookeeper ad-0104:2181 --topic deleteme
 
  Topic:deleteme PartitionCount:2 ReplicationFactor:1 Configs:
  Topic: deleteme Partition: 0 Leader: 0 Replicas: 0 Isr: 0
  Topic: deleteme Partition: 1 Leader: 1 Replicas: 1 Isr: 1
 
 
  Finally, I run the console producer
 
  kafka-console-producer.sh --broker-list ad-0102:9092 --topic deleteme
 
  I get the following warning
 
  [2015-02-08 00:36:24,244] WARN Property topic is not valid
  (kafka.utils.VerifiableProperties)
 
  and then it waits for console input. When I try to send a message I get
 the
  following list of error messages
 
  [2015-02-08 00:37:04,735] WARN Error while fetching metadata
  [{TopicMetadata for topic deleteme -
  No partition metadata for topic deleteme due to
  kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
  kafka.common.LeaderNotAvailableException
   (kafka.producer.BrokerPartitionInfo)
  [2015-02-08 00:37:04,751] WARN Error while fetching metadata
  [{TopicMetadata for topic deleteme -
  No partition metadata for topic deleteme due to
  kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
  kafka.common.LeaderNotAvailableException
   (kafka.producer.BrokerPartitionInfo)
  [2015-02-08 00:37:04,752] ERROR Failed to collate messages by topic,
  partition due to: Failed to fetch topic metadata for topic: deleteme
  (kafka.producer.async.DefaultEventHandler)
  [2015-02-08 00:37:04,859] WARN Error while fetching metadata
  [{TopicMetadata for topic deleteme -
  No partition metadata for topic deleteme due to
  kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
  kafka.common.LeaderNotAvailableException
   (kafka.producer.BrokerPartitionInfo)
  [2015-02-08 00:37:04,863] WARN Error while fetching metadata
  [{TopicMetadata for topic deleteme -
  No partition metadata for topic deleteme due to
  kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
  kafka.common.LeaderNotAvailableException
   (kafka.producer.BrokerPartitionInfo)
  [2015-02-08 00:37:04,863] ERROR Failed to collate messages by topic,
  partition due to: Failed to fetch topic metadata for topic: deleteme
  (kafka.producer.async.DefaultEventHandler)
  [2015-02-08 00:37:04,968] WARN Error while fetching metadata
  [{TopicMetadata for topic deleteme -
  No partition metadata for topic deleteme due to
  kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
  kafka.common.LeaderNotAvailableException
   (kafka.producer.BrokerPartitionInfo)
  [2015-02-08 00:37:04,974] WARN Error while fetching metadata
  [{TopicMetadata for topic deleteme -
  No partition metadata for topic deleteme due to
  kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
  kafka.common.LeaderNotAvailableException
   (kafka.producer.BrokerPartitionInfo)
  [2015-02-08 00:37:04,974] ERROR Failed to collate messages by topic,
  partition due to: Failed to fetch topic metadata for topic: deleteme
  (kafka.producer.async.DefaultEventHandler)
  [2015-02-08 00:37:05,079] WARN Error while fetching metadata
  [{TopicMetadata for topic deleteme -
  No partition metadata for topic deleteme due to
  kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
  kafka.common.LeaderNotAvailableException
   (kafka.producer.BrokerPartitionInfo)
  [2015-02-08 00:37:05,084] WARN Error while fetching metadata
  [{TopicMetadata for topic deleteme -
  No partition metadata for topic deleteme due to
  kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
  kafka.common.LeaderNotAvailableException
   (kafka.producer.BrokerPartitionInfo)
  [2015-02-08 00:37:05,084] ERROR Failed to collate messages by topic,
  partition due to: Failed to fetch topic metadata for topic: deleteme
  (kafka.producer.async.DefaultEventHandler)
  [2015-02-08 00:37:05,189] WARN Error while fetching metadata
  [{TopicMetadata for topic deleteme -
  No partition metadata for topic deleteme due to
  kafka.common.LeaderNotAvailableException}] for topic [deleteme]: class
  kafka.common.LeaderNotAvailableException
   

Re: [DISCUSS] KIP-8 Add a flush method to the new Java producer

2015-02-08 Thread Jay Kreps
Well actually in the case of linger.ms = 0 the send is still asynchronous
so calling flush() blocks until all the previously sent records have
completed. It doesn't speed anything up in that case, though, since they
are already available to send.

-Jay

On Sun, Feb 8, 2015 at 10:36 AM, Gwen Shapira gshap...@cloudera.com wrote:

 Looks good to me.

 I like the idea of not blocking additional sends but not guaranteeing that
 flush() will deliver them.

 I assume that with linger.ms = 0, flush will just be a noop (since the
 queue will be empty). Is that correct?

 Gwen

 On Sun, Feb 8, 2015 at 10:25 AM, Jay Kreps jay.kr...@gmail.com wrote:

  Following up on our previous thread on making batch send a little easier,
  here is a concrete proposal to add a flush() method to the producer:
 
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/KIP-8+-+Add+a+flush+method+to+the+producer+API
 
  A proposed implementation is here:
  https://issues.apache.org/jira/browse/KAFKA-1865
 
  Thoughts?
 
  -Jay