Re: Horizontally Scaling Kafka Consumers

2015-04-30 Thread Joe Stein
The Go Kafka Client also supports offset storage in ZK and Kafka
https://github.com/stealthly/go_kafka_client/blob/master/docs/offset_storage.md
and has two other strategies for partition ownership with a consensus
server (currently uses Zookeeper will be implementing Consul in near
future).

~ Joestein

On Thu, Apr 30, 2015 at 2:15 AM, Nimi Wariboko Jr n...@channelmeter.com
wrote:

 My mistake, it seems the Java drivers are a lot more advanced than the
 Shopify's Kafka driver (or I am missing something) - and I haven't used
 Kafka before.

 With the Go driver - it seems you have to manage offsets and partitions
 within the application code, while in Scala driver it seems you have the
 option of simply subscribing to a topic, and someone else will manage that
 part.

 After digging around a bit more, I found there is another library -
 https://github.com/wvanbergen/kafka - that speaks the consumergroup API
 and
 accomplishes what I was looking for and I assume is implemented by keeping
 track of memberships w/ Zookeeper.

 Thank you for the information - it really helped clear up what I failing to
 understand with kafka.

 Nimi

 On Wed, Apr 29, 2015 at 10:10 PM, Joe Stein joe.st...@stealth.ly wrote:

  You can do this with the existing Kafka Consumer
 
 
 https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L106
  and probably any other Kafka client too (maybe with minor/major rework
  to-do the offset management).
 
  The new consumer approach is more transparent on Subscribing To Specific
  Partitions
 
 
 https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L200-L234
  .
 
  Here is a Docker file (** pull request pending **) for wrapping kafka
  consumers (doesn't have to be the go client, need to abstract that out
 some
  more after more testing)
 
 
 https://github.com/stealthly/go_kafka_client/blob/mesos-marathon/consumers/Dockerfile
 
 
  Also a VM (** pull request pending **) to build container, push to local
  docker repository and launch on Apache Mesos
 
 
 https://github.com/stealthly/go_kafka_client/tree/mesos-marathon/mesos/vagrant
  as working example how-to-do.
 
  All of this could be done without the Docker container and still work on
  Mesos ... or even without Mesos and on YARN.
 
  You might also want to checkout how Samza integrates with Execution
  Frameworks
 
 
 http://samza.apache.org/learn/documentation/0.9/comparisons/introduction.html
  which has a Mesos patch https://issues.apache.org/jira/browse/SAMZA-375
  and
  built in YARN support.
 
  ~ Joe Stein
  - - - - - - - - - - - - - - - - -
 
http://www.stealth.ly
  - - - - - - - - - - - - - - - - -
 
  On Wed, Apr 29, 2015 at 8:56 AM, David Corley davidcor...@gmail.com
  wrote:
 
   You're right Stevo, I should re-phrase to say that there can be no more
   _active_ consumers than there are partitions (within a single consumer
   group).
   I'm guessing that's what Nimi is alluding to asking, but perhaps he can
   elaborate on whether he's using consumer groups and/or whether the 100
   partitions are all for a single topic, or multiple topics.
  
   On 29 April 2015 at 13:38, Stevo Slavić ssla...@gmail.com wrote:
  
Please correct me if wrong, but I think it is really not hard
  constraint
that one cannot have more consumers (from same group) than partitions
  on
single topic - all the surplus consumers will not be assigned to
  consume
any partition, but they can be there and as soon as one active
 consumer
from same group goes offline (its connection to ZK is dropped),
  consumers
from the group will be rebalanced so one passively waiting consumer
  will
become active.
   
Kind regards,
Stevo Slavic.
   
On Wed, Apr 29, 2015 at 2:25 PM, David Corley davidcor...@gmail.com
 
wrote:
   
 If the 100 partitions are all for the same topic, you can have up
 to
   100
 consumers working as part of a single consumer group for that
 topic.
 You cannot have more consumers than there are partitions within a
  given
 consumer group.

 On 29 April 2015 at 08:41, Nimi Wariboko Jr n...@channelmeter.com
 
wrote:

  Hi,
 
  I was wondering what options there are for horizontally scaling
  kafka
  consumers? Basically if I have 100 partitions and 10 consumers,
 and
want
 to
  temporarily scale up to 50 consumers, what options do I have?
 
  So far I've thought of just simply tracking consumer membership
   somehow
  (either through Raft or zookeeper's znodes) on the consumers.
 

   
  
 



Re: Kafka 0.8.2 beta - release

2015-04-30 Thread Ewen Cheslack-Postava
That's part of the new consumer API that hasn't been released yet. The API
happens to be included in the 0.8.2.* artifacts because it is under
development, but isn't yet released -- it hasn't been mentioned in the
release notes, nor is it in the official documentation:
http://kafka.apache.org/documentation.html

That API is currently under active development and should be available in
the next release. If you want to test it out, you can use build a copy
yourself of trunk, but the high-level consumer functionality is not yet
implemented so it likely does not include everything you want. For the time
being, you probably want to use the existing high level consumer API:
http://kafka.apache.org/documentation.html#highlevelconsumerapi


On Wed, Apr 29, 2015 at 11:07 PM, Gomathivinayagam Muthuvinayagam 
sankarm...@gmail.com wrote:

 Thank you,

 It seems the following methods are not supported in KafkaConsumer. Do you
 know when they will be supported?

 public OffsetMetadata commit(MapTopicPartition, Long offsets, boolean
 sync) {

 throw new UnsupportedOperationException();

 }

 Thanks  Regards,



 On Wed, Apr 29, 2015 at 10:52 PM, Ewen Cheslack-Postava e...@confluent.io
 
 wrote:

  It has already been released, including a minor revision to fix some
  critical bugs. The latest release is 0.8.2.1. The downloads page has
 links
  and release notes: http://kafka.apache.org/downloads.html
 
  On Wed, Apr 29, 2015 at 10:22 PM, Gomathivinayagam Muthuvinayagam 
  sankarm...@gmail.com wrote:
 
   I see lot of interesting features with Kafka 0.8.2 beta. I am just
   wondering when that will be released. Is there any timeline for that?
  
   Thanks  Regards,
  
 
 
 
  --
  Thanks,
  Ewen
 




-- 
Thanks,
Ewen


Re: MultiThreaded HLConsumer Exits before events are all consumed

2015-04-30 Thread christopher palm
What I found was  2 problems.
1. The producer wasn't passing in a partition key, so not all partitions
were getting data.
2. After fixing the producer, I could see all threads getting data
consistently then the shutdown method was
clearly killing the threads.
I have removed the shutdown,and with the producer changes sending in a key,
this looks like it is running correctly now.

Thanks!

On Wed, Apr 29, 2015 at 10:59 PM, tao xiao xiaotao...@gmail.com wrote:

 The log suggests that the shutdown method were still called

 Thread 0: 2015-04-29
 12:55:54.292|3|13|Normal|-74.1892627|41.33900999753

 Last Shutdown via example.shutDown called!

 15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
 ZKConsumerConnector shutting down

 Please ensure no consumer.shutdown(); and  executor.shutdown(); are called
 during the course of your program

 On Thu, Apr 30, 2015 at 2:23 AM, christopher palm cpa...@gmail.com
 wrote:

  Commenting out Example shutdown did not seem to make a difference, I
 added
  the print statement below to highlight the fact.
 
  The other threads still shut down, and only one thread lives on,
 eventually
  that dies after a few minutes as well
 
  Could this be that the producer default partitioner is isn't balancing
 data
  across all partitions?
 
  Thanks,
  Chris
 
  Thread 0: 2015-04-29
  12:55:54.292|3|13|Normal|-74.1892627|41.33900999753
 
  Last Shutdown via example.shutDown called!
 
  15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:,
  ZKConsumerConnector shutting down
 
  15/04/29 13:09:38 INFO utils.KafkaScheduler: Forcing shutdown of Kafka
  scheduler
 
  15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
  [ConsumerFetcherManager-1430330968420] Stopping leader finder thread
 
  15/04/29 13:09:38 INFO
 consumer.ConsumerFetcherManager$LeaderFinderThread:
  -leader-finder-thread], Shutting down
 
  15/04/29 13:09:38 INFO
 consumer.ConsumerFetcherManager$LeaderFinderThread:
  -leader-finder-thread], Stopped
 
  15/04/29 13:09:38 INFO
 consumer.ConsumerFetcherManager$LeaderFinderThread:
  -leader-finder-thread], Shutdown completed
 
  15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
  [ConsumerFetcherManager-1430330968420] Stopping all fetchers
 
  15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
  [ConsumerFetcherThread-consumergroup], Shutting down
 
  15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
  [ConsumerFetcherThread-], Stopped
 
  15/04/29 13:09:38 INFO consumer.ConsumerFetcherThread:
  [ConsumerFetcherThread-], Shutdown completed
 
  15/04/29 13:09:38 INFO consumer.ConsumerFetcherManager:
  [ConsumerFetcherManager-] All connections stopped
 
  15/04/29 13:09:38 INFO zkclient.ZkEventThread: Terminate ZkClient event
  thread.
 
  Shutting down Thread: 2
 
  Shutting down Thread: 1
 
  Shutting down Thread: 3
 
  15/04/29 13:09:38 INFO consumer.ZookeeperConsumerConnector:
  [consumergroup], ZKConsumerConnector shut down completed
 
  Thread 0: 2015-04-29 12:55:55.302|4|14|Unsafe tail
  distance|-73.99021500035|40.6636611
 
  15/04/29 13:09:39 INFO consumer.ZookeeperConsumerConnector:
  [consumergroup], stopping watcher executor thread for consumer
  consumergroup
 
  Thread 0: 2015-04-29
  12:55:56.313|1|11|Normal|-79.74165300042|42.1304580009
 
  On Wed, Apr 29, 2015 at 10:11 AM, tao xiao xiaotao...@gmail.com wrote:
 
   example.shutdown(); in ConsumerGroupExample closes all consumer
  connections
   to Kafka. remove this line the consumer threads will run forever
  
   On Wed, Apr 29, 2015 at 9:42 PM, christopher palm cpa...@gmail.com
   wrote:
  
Hi All,
   
I am trying to get a multi threaded HL consumer working against a 2
   broker
Kafka cluster with a 4 partition 2 replica  topic.
   
The consumer code is set to run with 4 threads, one for each
 partition.
   
The producer code uses the default partitioner and loops indefinitely
feeding events into the topic.(I excluded the while loop in the paste
below)
   
What I see is the threads eventually all exit, even thought the
  producer
   is
still sending events into the topic.
   
My understanding is that the consumer thread per partition is the
  correct
setup.
   
Any ideas why this code doesn't continue to consume events at they
 are
pushed to the topic?
   
I suspect I am configuring something wrong here, but am not sure
 what.
   
Thanks,
   
Chris
   
   
*T**opic Configuration*
   
Topic:kafkatopicRep4Part4 PartitionCount:4 ReplicationFactor:2
 Configs:
   
Topic: kafkatopicRep4Part4 Partition: 0 Leader: 1 Replicas: 2,1 Isr:
  1,2
   
Topic: kafkatopicRep4Part4 Partition: 1 Leader: 1 Replicas: 1,2 Isr:
  1,2
   
Topic: kafkatopicRep4Part4 Partition: 2 Leader: 1 Replicas: 2,1 Isr:
  1,2
   
 Topic: kafkatopicRep4Part4 Partition: 3 Leader: 1 Replicas: 1,2 Isr:
  1,2
   
*Producer Code:*
   
 Properties props = new Properties();
   
 

Re: Horizontally Scaling Kafka Consumers

2015-04-30 Thread Evan Huus
On Thu, Apr 30, 2015 at 2:15 AM, Nimi Wariboko Jr n...@channelmeter.com
wrote:

 My mistake, it seems the Java drivers are a lot more advanced than the
 Shopify's Kafka driver (or I am missing something) - and I haven't used
 Kafka before.

 With the Go driver - it seems you have to manage offsets and partitions
 within the application code, while in Scala driver it seems you have the
 option of simply subscribing to a topic, and someone else will manage that
 part.

 After digging around a bit more, I found there is another library -
 https://github.com/wvanbergen/kafka - that speaks the consumergroup API
 and
 accomplishes what I was looking for and I assume is implemented by keeping
 track of memberships w/ Zookeeper.


Yes. That library is built on top of Sarama (Shopify's Go kafka driver),
and it's on our roadmap to integrate it properly. As far as I know, this is
the only major area where Sarama is lagging behind the jvm client.



 Thank you for the information - it really helped clear up what I failing to
 understand with kafka.

 Nimi

 On Wed, Apr 29, 2015 at 10:10 PM, Joe Stein joe.st...@stealth.ly wrote:

  You can do this with the existing Kafka Consumer
 
 
 https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L106
  and probably any other Kafka client too (maybe with minor/major rework
  to-do the offset management).
 
  The new consumer approach is more transparent on Subscribing To Specific
  Partitions
 
 
 https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L200-L234
  .
 
  Here is a Docker file (** pull request pending **) for wrapping kafka
  consumers (doesn't have to be the go client, need to abstract that out
 some
  more after more testing)
 
 
 https://github.com/stealthly/go_kafka_client/blob/mesos-marathon/consumers/Dockerfile
 
 
  Also a VM (** pull request pending **) to build container, push to local
  docker repository and launch on Apache Mesos
 
 
 https://github.com/stealthly/go_kafka_client/tree/mesos-marathon/mesos/vagrant
  as working example how-to-do.
 
  All of this could be done without the Docker container and still work on
  Mesos ... or even without Mesos and on YARN.
 
  You might also want to checkout how Samza integrates with Execution
  Frameworks
 
 
 http://samza.apache.org/learn/documentation/0.9/comparisons/introduction.html
  which has a Mesos patch https://issues.apache.org/jira/browse/SAMZA-375
  and
  built in YARN support.
 
  ~ Joe Stein
  - - - - - - - - - - - - - - - - -
 
http://www.stealth.ly
  - - - - - - - - - - - - - - - - -
 
  On Wed, Apr 29, 2015 at 8:56 AM, David Corley davidcor...@gmail.com
  wrote:
 
   You're right Stevo, I should re-phrase to say that there can be no more
   _active_ consumers than there are partitions (within a single consumer
   group).
   I'm guessing that's what Nimi is alluding to asking, but perhaps he can
   elaborate on whether he's using consumer groups and/or whether the 100
   partitions are all for a single topic, or multiple topics.
  
   On 29 April 2015 at 13:38, Stevo Slavić ssla...@gmail.com wrote:
  
Please correct me if wrong, but I think it is really not hard
  constraint
that one cannot have more consumers (from same group) than partitions
  on
single topic - all the surplus consumers will not be assigned to
  consume
any partition, but they can be there and as soon as one active
 consumer
from same group goes offline (its connection to ZK is dropped),
  consumers
from the group will be rebalanced so one passively waiting consumer
  will
become active.
   
Kind regards,
Stevo Slavic.
   
On Wed, Apr 29, 2015 at 2:25 PM, David Corley davidcor...@gmail.com
 
wrote:
   
 If the 100 partitions are all for the same topic, you can have up
 to
   100
 consumers working as part of a single consumer group for that
 topic.
 You cannot have more consumers than there are partitions within a
  given
 consumer group.

 On 29 April 2015 at 08:41, Nimi Wariboko Jr n...@channelmeter.com
 
wrote:

  Hi,
 
  I was wondering what options there are for horizontally scaling
  kafka
  consumers? Basically if I have 100 partitions and 10 consumers,
 and
want
 to
  temporarily scale up to 50 consumers, what options do I have?
 
  So far I've thought of just simply tracking consumer membership
   somehow
  (either through Raft or zookeeper's znodes) on the consumers.
 

   
  
 



Sometimes I don't get a leader with 1 broker

2015-04-30 Thread Dillian Murphey
Running a 1 broker system.  I had some issues with the system but got it
working. I've deleted the topic I had trouble with and re-created it.

But describing shows no leader, not producer/consumption works on it.

I create a brand new topic with a name I never used before and I get a
leader.  I think I sometimes get a leader and sometimes don't. Not sure.

controller log is fine.

Is this normal?


Kafka still aware of old zookeeper nodes

2015-04-30 Thread Dillian Murphey
I had 3 zookeeper nodes. I added 3 new ones and shut down the old 3.

The server.log shows Closing socket connection error to the old IPs. I
rebooted the kafka server entirely but it still somehow seems aware of
these servers.

Any ideas what's up?


Re: Kafka still aware of old zookeeper nodes

2015-04-30 Thread svante karlsson
Have you changed

zookeeper.connect=

in server.properties.

A better procedure for replacing zookeeper nodes would be to shutdown one
and install the new one with the same ip. This can easily be done to a
running cluster.

/svante

2015-04-30 20:08 GMT+02:00 Dillian Murphey crackshotm...@gmail.com:

 I had 3 zookeeper nodes. I added 3 new ones and shut down the old 3.

 The server.log shows Closing socket connection error to the old IPs. I
 rebooted the kafka server entirely but it still somehow seems aware of
 these servers.

 Any ideas what's up?



Re: Horizontally Scaling Kafka Consumers

2015-04-30 Thread Sharninder
You need to first decide the conditions that need to be met for you to
scale to 50 consumers. These can be as simple as the consumer lag. Look at
the console offset checker tool and see if any of those numbers make sense.
Your existing consumers could also produce some metrics based on which
another process will decide when to spawn new customers.

--
Sharninder


On Wed, Apr 29, 2015 at 11:58 PM, Nimi Wariboko Jr n...@channelmeter.com
wrote:

 Hi,

 I was wondering what options there are/what other people are doing for
 horizontally scaling kafka consumers? Basically if I have 100 partitions
 and 10 consumers, and want to temporarily scale up to 50 consumers, what
 can I do?

 So far I've thought of just simply tracking consumer membership somehow
 (either through zookeeper's ephemeral nodes or maybe using gossip) on the
 consumers to achieve consensus on who consumes what. Another option would
 be having a router, possibly using something like nsq (I understand that
 they are similar pieces of software, but what we are going for is a
 persistent distributed queue (sharding) which is why I'm looking into
 Kafka)?




-- 
--
Sharninder


Delete topic / Recreate = No leader

2015-04-30 Thread Dillian Murphey
I am trying to reproduce this. But if I create a topic, then delete it,
then re-create it, no leader is getting assigned.

I can still produce/consume messages (via command line, basic testing).

Is there some additional cleanup I need to do?

Thanks for your time!


Re: Kafka still aware of old zookeeper nodes

2015-04-30 Thread Dillian Murphey
Not sure if this is the best way to do this, but my zookeeper.connect is
set to a DNS alias which points to a load balancer for 3 zookeeper nodes.

I was trying this to see if I could have the kafka config dynamic and allow
me to change/scale whatever I wanted with zookeeper and not have to ever
mess with the config for kafka.

Thanks for your comments.

On Thu, Apr 30, 2015 at 11:35 AM, svante karlsson s...@csi.se wrote:

 Have you changed

 zookeeper.connect=

 in server.properties.

 A better procedure for replacing zookeeper nodes would be to shutdown one
 and install the new one with the same ip. This can easily be done to a
 running cluster.

 /svante

 2015-04-30 20:08 GMT+02:00 Dillian Murphey crackshotm...@gmail.com:

  I had 3 zookeeper nodes. I added 3 new ones and shut down the old 3.
 
  The server.log shows Closing socket connection error to the old IPs. I
  rebooted the kafka server entirely but it still somehow seems aware of
  these servers.
 
  Any ideas what's up?
 



Re: New Producer API - batched sync mode support

2015-04-30 Thread Ivan Balashov
2015-04-30 8:50 GMT+03:00 Ewen Cheslack-Postava e...@confluent.io:

 They aren't going to get this anyway (as Jay pointed out) given the current
 broker implementation


Is it also incorrect to assume atomicity even if all messages in the batch
go to the same partition?


Re: New Producer API - batched sync mode support

2015-04-30 Thread Gwen Shapira
Why do we think atomicity is expected, if the old API we are emulating here
lacks atomicity?

I don't remember emails to the mailing list saying: I expected this batch
to be atomic, but instead I got duplicates when retrying after a failed
batch send.
Maybe atomicity isn't as strong requirement as we believe? That is,
everyone expects some duplicates during failure events and handles them
downstream?



On Thu, Apr 30, 2015 at 2:02 PM, Ivan Balashov ibalas...@gmail.com wrote:

 2015-04-30 8:50 GMT+03:00 Ewen Cheslack-Postava e...@confluent.io:

  They aren't going to get this anyway (as Jay pointed out) given the
 current
  broker implementation
 

 Is it also incorrect to assume atomicity even if all messages in the batch
 go to the same partition?



Re: Data replication and zero data loss

2015-04-30 Thread Jiangjie Qin
Which mirror maker version did you look at? The MirrorMaker in trunk
should not have data loss if you just use the default setting.

On 4/30/15, 7:53 PM, Joong Lee jo...@me.com wrote:

Hi,
We are exploring Kafka to keep two data centers (primary and DR) running
hosts of elastic search nodes in sync. One key requirement is that we
can't lose any data. We POC'd use of MirrorMaker and felt it may not meet
out data loss requirement.

I would like ask the community if we should look for another solution or
would Kafka be the right solution considering zero data loss requirement.

Thanks



RE: Java Consumer API

2015-04-30 Thread Aditya Auradkar
It'll be officially ready only in version 0.9. 

Aditya


From: Mohit Gupta [success.mohit.gu...@gmail.com]
Sent: Thursday, April 30, 2015 8:58 PM
To: users@kafka.apache.org
Subject: Java Consumer API

Hello,

Kafka documentation ( http://kafka.apache.org/documentation.html#producerapi
) suggests using only Producer from kafka-clients ( 0.8.2.0 ) and to use
Consumer from the packaged scala client. I just want to check once if the
Consumer API from this client is ready for production use.


--
Best Regards,
Mohit


Re: New Producer API - batched sync mode support

2015-04-30 Thread Jiangjie Qin
Roshan,

If I understand correctly, you just want to make sure a number of messages
has been sent successfully. Using callback might be easier to do so.

Public class MyCallback implements Callback {
public SetRecordMetadata failedSend;
@Override
Public void onCompletion(RecordMetadata metadata, Exception exception) {
If (exception != null)
failedSend.add(metadata);
}

Public boolean hasFailure() {return failedSend.size()  0);
}

In main code, you just need to do the following:
{
MyCallback callback = new MyCallback();
For (ProducerRecord record: records)
Producer.send();

Producer.flush();
If (callback.hasFailure())
// do something
}

This will avoid the loop checking and provide you pretty much the same
guarantee as old producer if not better.

Jiangjie (Becket) Qin


On 4/30/15, 4:54 PM, Roshan Naik ros...@hortonworks.com wrote:

@Gwen, @Ewen,
  While atomicity of a batch is nice to have, it is not essential. I don't
think users always expect such atomicity. Atomicity is not even guaranteed
in many un-batched systems let alone batched systems.

As long as the client gets informed about the ones that failed in the
batch.. that would suffice.

One issue with the current flush() based batch-sync implementation is that
the client needs to iterate over *all* futures in order to scan for any
failed messages. In the common case, it is just wasted CPU cycles as there
won't be any failures. Would be ideal if the client is informed about only
problematic messages.

  IMO, adding a new send(batch) API may be meaningful if it can provide
benefits beyond what user can do with a simple wrapper on existing stuff.
For example: eliminate the CPU cycles wasted on examining results from
successful message deliveries, or other efficiencies.



@Ivan,
   I am not certain, I am thinking that there is a possibility that the
first few messages of the batch got accepted, but not the remainder ? At
the same time based on some comments made earlier it appears underlying
implementation does have an all-or-none mechanism for a batch going to a
partition.
For simplicity, streaming clients may not want to deal explicitly with
partitions (and get exposed to repartitioning  leader change type issues)

-roshan



On 4/30/15 2:07 PM, Gwen Shapira gshap...@cloudera.com wrote:

Why do we think atomicity is expected, if the old API we are emulating
here
lacks atomicity?

I don't remember emails to the mailing list saying: I expected this
batch
to be atomic, but instead I got duplicates when retrying after a failed
batch send.
Maybe atomicity isn't as strong requirement as we believe? That is,
everyone expects some duplicates during failure events and handles them
downstream?



On Thu, Apr 30, 2015 at 2:02 PM, Ivan Balashov ibalas...@gmail.com
wrote:

 2015-04-30 8:50 GMT+03:00 Ewen Cheslack-Postava e...@confluent.io:

  They aren't going to get this anyway (as Jay pointed out) given the
 current
  broker implementation
 

 Is it also incorrect to assume atomicity even if all messages in the
batch
 go to the same partition?





Re: Data replication and zero data loss

2015-04-30 Thread Daniel Compton
When we evaluated MirrorMaker last year we didn't find any risk of data
loss, only duplicate messages in the case of a network partition.

Did you discover data loss in your tests, or were you just looking at the
docs?
On Fri, 1 May 2015 at 4:31 pm Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Which mirror maker version did you look at? The MirrorMaker in trunk
 should not have data loss if you just use the default setting.

 On 4/30/15, 7:53 PM, Joong Lee jo...@me.com wrote:

 Hi,
 We are exploring Kafka to keep two data centers (primary and DR) running
 hosts of elastic search nodes in sync. One key requirement is that we
 can't lose any data. We POC'd use of MirrorMaker and felt it may not meet
 out data loss requirement.
 
 I would like ask the community if we should look for another solution or
 would Kafka be the right solution considering zero data loss requirement.
 
 Thanks




Java Consumer API

2015-04-30 Thread Mohit Gupta
Hello,

Kafka documentation ( http://kafka.apache.org/documentation.html#producerapi
) suggests using only Producer from kafka-clients ( 0.8.2.0 ) and to use
Consumer from the packaged scala client. I just want to check once if the
Consumer API from this client is ready for production use.


-- 
Best Regards,
Mohit


Pulling Snapshots from Kafka, Log compaction last compact offset

2015-04-30 Thread Jan Filipiak

Hello Everyone,

I am quite exited about the recent example of replicating PostgresSQL 
Changes to Kafka. My view on the log compaction feature always had been 
a very sceptical one, but now with its great potential exposed to the 
wide public, I think its an awesome feature. Especially when pulling 
this data into HDFS as a Snapshot, it is (IMO) a sqoop killer. So I want 
to thank everyone who had the vision of building these kind of systems 
during a time I could not imagine those.


There is one open question that I would like people to help me with. 
When pulling a snapshot of a partition into HDFS using a camus-like 
application I feel the need of keeping a Set of all keys read so far and 
stop as soon as I find a key beeing already in my set. I use this as an 
indicator of how far the log compaction has happened already and only 
pull up to this point. This works quite well as I do not need to keep 
the messages but only their keys in memory.


The question I want to raise with the community is:

How do you prevent pulling the same record twice (in different versions) 
and would it be beneficial if the OffsetResponse would also return the 
last offset that got compacted so far and the application would just 
pull up to this point?


Looking forward for some recommendations and comments.

Best
Jan



Data replication and zero data loss

2015-04-30 Thread Joong Lee
Hi,
We are exploring Kafka to keep two data centers (primary and DR) running hosts 
of elastic search nodes in sync. One key requirement is that we can't lose any 
data. We POC'd use of MirrorMaker and felt it may not meet out data loss 
requirement. 

I would like ask the community if we should look for another solution or would 
Kafka be the right solution considering zero data loss requirement. 

Thanks

Re: Leaderless topics

2015-04-30 Thread Gwen Shapira
Which Kafka version are you using?

On Thu, Apr 30, 2015 at 4:11 PM, Dillian Murphey crackshotm...@gmail.com
wrote:

 Scenerio with 1 node broker, and 3 node zookeeper ensemble.

 1) Create topic
 2) Delete topic
 3) Re-create with same name

 I'm noticing this recreation gives me Leader: non, and Isr: as empty.

 Any ideas what the deal is here?

 I googled around and not being an experienced kafka admin, someone said to
 delete the /controller entry in zk. This appears to fix the problem on
 existing topics that show no leader.

 Is it ok to do this? What am I doing by deleting /controller? Is there a
 better way?

 Thanks for any advice, and your time of course.



Hitting integer limit when setting log segment.bytes

2015-04-30 Thread Lance Laursen
Hey all,

I am attempting to create a topic which uses 8GB log segment sizes, like so:
./kafka-topics.sh --zookeeper localhost:2181 --create --topic perftest6p2r
--partitions 6 --replication-factor 2 --config max.message.bytes=655360
--config segment.bytes=8589934592

And am getting the following error:
Error while executing topic command For input string: 8589934592
java.lang.NumberFormatException: For input string: 8589934592
at
java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:583)
...
...

Upon further testing with --alter topic, it would appear that segment.bytes
will not accept a value higher than 2,147,483,647, which is the upper limit
for a signed 32bit int. This then restricts log segment size to an upper
limit of ~2GB.

We run Kafka on hard drive dense machines, each with 10gbit uplinks. We can
set ulimits higher in order to deal with all the open file handles (since
Kafka keeps all log segment file handles open), but it would be preferable
to minimize this number, as well as minimize the amount of log segment
rollover experienced at high traffic (ie: a rollover every 1-2 seconds or
so when saturating 10gbe).

Is there a reason (performance or otherwise) that a 32 bit integer is used
rather than something larger?

Thanks,
-Lance


Leaderless topics

2015-04-30 Thread Dillian Murphey
Scenerio with 1 node broker, and 3 node zookeeper ensemble.

1) Create topic
2) Delete topic
3) Re-create with same name

I'm noticing this recreation gives me Leader: non, and Isr: as empty.

Any ideas what the deal is here?

I googled around and not being an experienced kafka admin, someone said to
delete the /controller entry in zk. This appears to fix the problem on
existing topics that show no leader.

Is it ok to do this? What am I doing by deleting /controller? Is there a
better way?

Thanks for any advice, and your time of course.


Re: New Producer API - batched sync mode support

2015-04-30 Thread Roshan Naik
@Gwen, @Ewen,
  While atomicity of a batch is nice to have, it is not essential. I don't
think users always expect such atomicity. Atomicity is not even guaranteed
in many un-batched systems let alone batched systems.

As long as the client gets informed about the ones that failed in the
batch.. that would suffice.

One issue with the current flush() based batch-sync implementation is that
the client needs to iterate over *all* futures in order to scan for any
failed messages. In the common case, it is just wasted CPU cycles as there
won't be any failures. Would be ideal if the client is informed about only
problematic messages.

  IMO, adding a new send(batch) API may be meaningful if it can provide
benefits beyond what user can do with a simple wrapper on existing stuff.
For example: eliminate the CPU cycles wasted on examining results from
successful message deliveries, or other efficiencies.



@Ivan,
   I am not certain, I am thinking that there is a possibility that the
first few messages of the batch got accepted, but not the remainder ? At
the same time based on some comments made earlier it appears underlying
implementation does have an all-or-none mechanism for a batch going to a
partition.
For simplicity, streaming clients may not want to deal explicitly with
partitions (and get exposed to repartitioning  leader change type issues)

-roshan



On 4/30/15 2:07 PM, Gwen Shapira gshap...@cloudera.com wrote:

Why do we think atomicity is expected, if the old API we are emulating
here
lacks atomicity?

I don't remember emails to the mailing list saying: I expected this batch
to be atomic, but instead I got duplicates when retrying after a failed
batch send.
Maybe atomicity isn't as strong requirement as we believe? That is,
everyone expects some duplicates during failure events and handles them
downstream?



On Thu, Apr 30, 2015 at 2:02 PM, Ivan Balashov ibalas...@gmail.com
wrote:

 2015-04-30 8:50 GMT+03:00 Ewen Cheslack-Postava e...@confluent.io:

  They aren't going to get this anyway (as Jay pointed out) given the
 current
  broker implementation
 

 Is it also incorrect to assume atomicity even if all messages in the
batch
 go to the same partition?




Re: Why fetching meta-data for topic is done three times?

2015-04-30 Thread Zakee
With reties 1 you still see the 3 secs delay? The idea is, you can change these 
property to reduce the time to throw exception to 1 secs or below. Does that 
help?

Thanks
Zakee



 On Apr 28, 2015, at 10:29 PM, Madhukar Bharti bhartimadhu...@gmail.com 
 wrote:
 
 Hi Zakee,
 
 message.send.max.retries is 1
 
 Regards,
 Madhukar
 
 On Tue, Apr 28, 2015 at 6:17 PM, Madhukar Bharti bhartimadhu...@gmail.com 
 mailto:bhartimadhu...@gmail.com
 wrote:
 
 Hi Zakee,
 
 Thanks for your reply.
 
 message.send.max.retries
 3
 
 retry.backoff.ms
 100
 
 topic.metadata.refresh.interval.ms
 600*1000
 
 This is my properties.
 
 Regards,
 Madhukar
 
 On Tue, Apr 28, 2015 at 3:26 AM, Zakee kzak...@netzero.net wrote:
 
 What values do you have for below properties? Or are these set to
 defaults?
 
 message.send.max.retries
 retry.backoff.ms
 topic.metadata.refresh.interval.ms
 
 Thanks
 Zakee
 
 
 
 On Apr 23, 2015, at 11:48 PM, Madhukar Bharti bhartimadhu...@gmail.com
 wrote:
 
 Hi All,
 
 Once gone through code found that, While Producer starts it does three
 things:
 
 1. Sends Meta-data request
 2. Send message to broker(fetching broker list)
 3. If number of message to be produce is grater than 0 then again tries
 to
 refresh metadata for outstanding produce requests.
 
 Each of the request takes configured timeout and go to next and finally
 once all is done then it will throw Exception(if 3 also fails).
 
 Here the problem is, if we set timeout as 1 sec then to throw an
 exception
 It takes 3 sec, so user request will be hanged up till 3 sec, that is
 comparatively high for response time and if all threads will be blocked
 due
 to producer send then whole application will be blocked for 3 sec. So we
 want to reduce this time to 1 sec. in overall to throw Exception.
 
 What is the possible way to do this?
 
 Thanks
 Madhukar
 
 On Thu, Apr 16, 2015 at 8:10 PM, Madhukar Bharti 
 bhartimadhu...@gmail.com
 wrote:
 
 Hi All,
 
 I came across a problem, If we use broker IP which is not reachable or
 out
 of network. Then it takes more than configured time(request.timeout.ms
 ).
 After checking the log got to know that it is trying to fetch topic
 meta-data three times by changing correlation id.
 Due to this even though i keep (request.timeout.ms=1000) It takes 3
 sec
 to throw Exception. I am using Kafka0.8.1.1 with patch
 
 https://issues.apache.org/jira/secure/attachment/12678547/kafka-1733-add-connectTimeoutMs.patch
 
 
 I have attached the log. Please check this and clarify why it is
 behaving
 like this. Whether it is by design or have to set some other property
 also.
 
 
 
 Regards
 Madhukar
 
 
 
 
 Want to place your ad here?
 Advertise on United Online
 
 http://thirdpartyoffers.netzero.net/TGL3255/5539ed87d69846d871dafmp08duc
 
 
 
 
 
 The WORST exercise for aging
 Avoid this #34;healthy#34; exercise to look  feel 5-10 years YOUNGER
 http://thirdpartyoffers.netzero.net/TGL3255/5540b94620e14394636c0mp13duc 
 http://thirdpartyoffers.netzero.net/TGL3255/5540b94620e14394636c0mp13duc


Re: Pulling Snapshots from Kafka, Log compaction last compact offset

2015-04-30 Thread Gwen Shapira
I feel a need to respond to the Sqoop-killer comment :)

1) Note that most databases have a single transaction log per db and in
order to get the correct view of the DB, you need to read it in order
(otherwise transactions will get messed up). This means you are limited to
a single producer reading data from the log, writing it to a single
partition and getting it read from a single consumer. If the database is
very large and very active, you may run into some issues there...

Because Sqoop doesn't try to catch up with all the changes, but takes a
snapshot (from multiple mappers in parallel), we can very rapidly Sqoop
10TB databases.

2) If HDFS is the target of getting data from Postgres, then postgresql -
kafka - HDFS seems less optimal than postgresql - HDFS directly (in
parallel). There are good reasons to get Postgres data to Kafka, but if the
eventual goal is HDFS (or HBase), I suspect Sqoop still has a place.

3) Due to its parallelism and general purpose JDBC connector, I suspect
that Sqoop is even a very viable way of getting data into Kafka.

Gwen


On Thu, Apr 30, 2015 at 2:27 PM, Jan Filipiak jan.filip...@trivago.com
wrote:

 Hello Everyone,

 I am quite exited about the recent example of replicating PostgresSQL
 Changes to Kafka. My view on the log compaction feature always had been a
 very sceptical one, but now with its great potential exposed to the wide
 public, I think its an awesome feature. Especially when pulling this data
 into HDFS as a Snapshot, it is (IMO) a sqoop killer. So I want to thank
 everyone who had the vision of building these kind of systems during a time
 I could not imagine those.

 There is one open question that I would like people to help me with. When
 pulling a snapshot of a partition into HDFS using a camus-like application
 I feel the need of keeping a Set of all keys read so far and stop as soon
 as I find a key beeing already in my set. I use this as an indicator of how
 far the log compaction has happened already and only pull up to this point.
 This works quite well as I do not need to keep the messages but only their
 keys in memory.

 The question I want to raise with the community is:

 How do you prevent pulling the same record twice (in different versions)
 and would it be beneficial if the OffsetResponse would also return the
 last offset that got compacted so far and the application would just pull
 up to this point?

 Looking forward for some recommendations and comments.

 Best
 Jan




Re: Kafka 0.8.2 beta - release

2015-04-30 Thread Gomathivinayagam Muthuvinayagam
Thank you,

It seems the following methods are not supported in KafkaConsumer. Do you
know when they will be supported?

public OffsetMetadata commit(MapTopicPartition, Long offsets, boolean
sync) {

throw new UnsupportedOperationException();

}

Thanks  Regards,



On Wed, Apr 29, 2015 at 10:52 PM, Ewen Cheslack-Postava e...@confluent.io
wrote:

 It has already been released, including a minor revision to fix some
 critical bugs. The latest release is 0.8.2.1. The downloads page has links
 and release notes: http://kafka.apache.org/downloads.html

 On Wed, Apr 29, 2015 at 10:22 PM, Gomathivinayagam Muthuvinayagam 
 sankarm...@gmail.com wrote:

  I see lot of interesting features with Kafka 0.8.2 beta. I am just
  wondering when that will be released. Is there any timeline for that?
 
  Thanks  Regards,
 



 --
 Thanks,
 Ewen



Re: Horizontally Scaling Kafka Consumers

2015-04-30 Thread Nimi Wariboko Jr
My mistake, it seems the Java drivers are a lot more advanced than the
Shopify's Kafka driver (or I am missing something) - and I haven't used
Kafka before.

With the Go driver - it seems you have to manage offsets and partitions
within the application code, while in Scala driver it seems you have the
option of simply subscribing to a topic, and someone else will manage that
part.

After digging around a bit more, I found there is another library -
https://github.com/wvanbergen/kafka - that speaks the consumergroup API and
accomplishes what I was looking for and I assume is implemented by keeping
track of memberships w/ Zookeeper.

Thank you for the information - it really helped clear up what I failing to
understand with kafka.

Nimi

On Wed, Apr 29, 2015 at 10:10 PM, Joe Stein joe.st...@stealth.ly wrote:

 You can do this with the existing Kafka Consumer

 https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L106
 and probably any other Kafka client too (maybe with minor/major rework
 to-do the offset management).

 The new consumer approach is more transparent on Subscribing To Specific
 Partitions

 https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L200-L234
 .

 Here is a Docker file (** pull request pending **) for wrapping kafka
 consumers (doesn't have to be the go client, need to abstract that out some
 more after more testing)

 https://github.com/stealthly/go_kafka_client/blob/mesos-marathon/consumers/Dockerfile


 Also a VM (** pull request pending **) to build container, push to local
 docker repository and launch on Apache Mesos

 https://github.com/stealthly/go_kafka_client/tree/mesos-marathon/mesos/vagrant
 as working example how-to-do.

 All of this could be done without the Docker container and still work on
 Mesos ... or even without Mesos and on YARN.

 You might also want to checkout how Samza integrates with Execution
 Frameworks

 http://samza.apache.org/learn/documentation/0.9/comparisons/introduction.html
 which has a Mesos patch https://issues.apache.org/jira/browse/SAMZA-375
 and
 built in YARN support.

 ~ Joe Stein
 - - - - - - - - - - - - - - - - -

   http://www.stealth.ly
 - - - - - - - - - - - - - - - - -

 On Wed, Apr 29, 2015 at 8:56 AM, David Corley davidcor...@gmail.com
 wrote:

  You're right Stevo, I should re-phrase to say that there can be no more
  _active_ consumers than there are partitions (within a single consumer
  group).
  I'm guessing that's what Nimi is alluding to asking, but perhaps he can
  elaborate on whether he's using consumer groups and/or whether the 100
  partitions are all for a single topic, or multiple topics.
 
  On 29 April 2015 at 13:38, Stevo Slavić ssla...@gmail.com wrote:
 
   Please correct me if wrong, but I think it is really not hard
 constraint
   that one cannot have more consumers (from same group) than partitions
 on
   single topic - all the surplus consumers will not be assigned to
 consume
   any partition, but they can be there and as soon as one active consumer
   from same group goes offline (its connection to ZK is dropped),
 consumers
   from the group will be rebalanced so one passively waiting consumer
 will
   become active.
  
   Kind regards,
   Stevo Slavic.
  
   On Wed, Apr 29, 2015 at 2:25 PM, David Corley davidcor...@gmail.com
   wrote:
  
If the 100 partitions are all for the same topic, you can have up to
  100
consumers working as part of a single consumer group for that topic.
You cannot have more consumers than there are partitions within a
 given
consumer group.
   
On 29 April 2015 at 08:41, Nimi Wariboko Jr n...@channelmeter.com
   wrote:
   
 Hi,

 I was wondering what options there are for horizontally scaling
 kafka
 consumers? Basically if I have 100 partitions and 10 consumers, and
   want
to
 temporarily scale up to 50 consumers, what options do I have?

 So far I've thought of just simply tracking consumer membership
  somehow
 (either through Raft or zookeeper's znodes) on the consumers.