Re: Why fetching meta-data for topic is done three times?

2015-04-23 Thread Madhukar Bharti
Hi All,

Once gone through code found that, While Producer starts it does three
things:

1. Sends Meta-data request
2. Send message to broker(fetching broker list)
3. If number of message to be produce is grater than 0 then again tries to
refresh metadata for outstanding produce requests.

Each of the request takes configured timeout and go to next and finally
once all is done then it will throw Exception(if 3 also fails).

Here the problem is, if we set timeout as 1 sec then to throw an exception
It takes 3 sec, so user request will be hanged up till 3 sec, that is
comparatively high for response time and if all threads will be blocked due
to producer send then whole application will be blocked for 3 sec. So we
want to reduce this time to 1 sec. in overall to throw Exception.

What is the possible way to do this?

Thanks
Madhukar

On Thu, Apr 16, 2015 at 8:10 PM, Madhukar Bharti 
wrote:

> Hi All,
>
> I came across a problem, If we use broker IP which is not reachable or out
> of network. Then it takes more than configured time(request.timeout.ms).
> After checking the log got to know that it is trying to fetch topic
> meta-data three times by changing correlation id.
> Due to this even though i keep (request.timeout.ms=1000) It takes 3 sec
> to throw Exception. I am using Kafka0.8.1.1 with patch
> https://issues.apache.org/jira/secure/attachment/12678547/kafka-1733-add-connectTimeoutMs.patch
>
>
> I have attached the log. Please check this and clarify why it is behaving
> like this. Whether it is by design or have to set some other property also.
>
>
>
> Regards
> Madhukar
>
>
>


[ANN] Apache Cloudstack 4.5 kafka-event-bus plugin

2015-04-23 Thread Pierre-Yves Ritschard
Hi list,

I thought I'd also mention that the next release of Apache Cloudstack
adds the ability to publish all events happening on throughout the
environment to kafka. Events are published as JSON.

http://cloudstack-administration.readthedocs.org/en/latest/events.html#kafka-configuration

Cheers,
  - pyr


Atomic write of message batch to single partition

2015-04-23 Thread Martin Krasser

Hello,

I'm using Kafka 0.8.2.1 (in a Scala/Java project) and trying to find out 
how to atomically write n messages (message batch) to a single topic 
partition. Is there any client API that gives such a guarantee? I 
couldn't find a clear answer reading the documentation, API docs (of the 
old and new producer) and mailing list archives - sorry if I missed 
something obvious.


With the new org.apache.kafka.clients.producer.KafkaProducer, the send 
method has only a single ProducerRecord parameter and record 
accumulation and batch-sending of records is an implementation detail 
(which can be controlled to some extend by the batch.size configuration 
setting etc but not by user-defined message batches). So it seems that 
the new producer cannot be used for that. Are there any plans to support 
that in future versions?


Only the old kafka.producer.Producer allows me to pass a user-defined 
KeyedMessage batch to its send method. What are the semantics of this 
method when producer.type=sync and all KeyedMessages for a given send 
call are targeted at the same topic partition? Are these messages being 
written atomically to the partition?


Are there other options to achieve atomic writes of user-defined message 
batches (except making the batch a single Kafka message)?


Thanks for any hints!

Regards,
Martin



article: hands-on kafka: dynamic DNS

2015-04-23 Thread Pierre-Yves Ritschard
Hi list!

I just wanted to mention a small article I put together to describe an
approach to leverage log compaction when you have compound types and
messages are operations on that compound type with an example use-case:

http://spootnik.org/entries/2015/04/23_hands-on-kafka-dynamic-dns.html

Always eager to hear about your feedback and other approaches.

Cheers,
  - pyr


Re: Kafka client - 0.9

2015-04-23 Thread Bharath Srinivasan
Thanks Gwen.

I'm specifically looking for the consumer rewrite API (
org.apache.kafka.clients.consumer.KafkaConsumer). Based on the wiki, this
feature is available only in 0.9.

The specific use case is that, I wanted to use the high level consumer but
with the ability to rollback the offset in case of any exceptions. Based on
the documentation, it seems like the current high level consumer API does
not seem to be supporting it, atleast not in a straight forward fashion.

Appreciate any alternate solutions.

On Thu, Apr 23, 2015 at 8:08 PM, Gwen Shapira  wrote:

> We don't normally plan dates for releases, when we are done with
> features we want in the next release and happy with quality, we
> release. Many Apache communities are like that.
>
> If you need firmer roadmaps and specific release dates, there are few
> vendors selling Kafka distributions and support.
>
> Are there any specific features you are waiting for?
>
> Gwen
>
> On Thu, Apr 23, 2015 at 2:25 PM, Bharath Srinivasan
>  wrote:
> > Hi,
> >
> > I'm looking for the 0.9 client release plan.
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
> >
> > Is there a planned date for this release?
> >
> > Thanks,
> > Bharath
>


Re: Kafka client - 0.9

2015-04-23 Thread Gwen Shapira
We don't normally plan dates for releases, when we are done with
features we want in the next release and happy with quality, we
release. Many Apache communities are like that.

If you need firmer roadmaps and specific release dates, there are few
vendors selling Kafka distributions and support.

Are there any specific features you are waiting for?

Gwen

On Thu, Apr 23, 2015 at 2:25 PM, Bharath Srinivasan
 wrote:
> Hi,
>
> I'm looking for the 0.9 client release plan.
>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
>
> Is there a planned date for this release?
>
> Thanks,
> Bharath


Re: NoClassDefFoundError at producer end

2015-04-23 Thread Gwen Shapira
Since it is a runtime error, Maven dependency is less relevant than
what you have in your class path (unless you built a shaded uber-jar).

You'll need Scala runtime and zkclient jar in the classpath, can you
check that you have those around?

On Thu, Apr 23, 2015 at 6:15 AM, abdul hameed pathan
 wrote:
> Hi,
>
> we are getting NoClassDefFoundError at our producer end. We are using only
> syn producer. we have 3 data producer systems whose using same
> kafka.javaapi.producer.Producer instance.Bellow is the maven dependency
>   
>org.apache.kafka
>kafka_2.10
>0.8.2.1
>  
>
> Bellow is the stack trace of exception.
>
> Exception in thread "ActiveMQ Session Task-1749"
> java.lang.NoClassDefFoundError:
> kafka/producer/async/DefaultEventHandler$$anonfun$dispatchSerializedData$3
> at
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:99)
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
> at kafka.producer.Producer.send(Producer.scala:77)
> at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> at
> com.snapdeal.services.requeue.impl.RequeueServiceImpl.producerSend(RequeueServiceImpl.java:107)
> at
> com.snapdeal.services.requeue.impl.RequeueServiceImpl.requeueToKafka(RequeueServiceImpl.java:86)
> at
> com.snapdeal.services.requeue.impl.RequeueServiceImpl.requeue(RequeueServiceImpl.java:71)
> at
> com.snapdeal.services.indexer.impl.IndexerCatalogServiceImpl.indexCatalogInfo(IndexerCatalogServiceImpl.java:344)
> at
> com.snapdeal.services.indexer.impl.SupcIndexerServiceImpl.index(SupcIndexerServiceImpl.java:63)
> at
> com.snapdeal.indexer.listener.impl.InventoryUpdateQueueListener.index(InventoryUpdateQueueListener.java:73)
> at
> com.snapdeal.indexer.listener.AbstractQueueListenerService.callIndex(AbstractQueueListenerService.java:82)
> at
> com.snapdeal.indexer.listener.impl.InventoryUpdateQueueListener.onMessage(InventoryUpdateQueueListener.java:92)
> at
> org.apache.activemq.ActiveMQMessageConsumer.dispatch(ActiveMQMessageConsumer.java:1298)
> at
> org.apache.activemq.ActiveMQSessionExecutor.dispatch(ActiveMQSessionExecutor.java:131)
> at
> org.apache.activemq.ActiveMQSessionExecutor.iterate(ActiveMQSessionExecutor.java:202)
> at
> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122)
> at
> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
>
>
>
> 00:00:15,525#
> [requestId=142967906333580768|appIdent=SearchAdmin|appIP=30.0.0.165|apiVariantId=null]
> ERROR [HttpSender] - Error http to : http:
> //
> internal-catalog-lb-mw-2086838461.ap-southeast-1.elb.amazonaws.com:8080/service/product/getPOGDetailListByIdList
> java.net.SocketException: Too many open files
> at java.net.Socket.createImpl(Socket.java:447)
> at java.net.Socket.getImpl(Socket.java:510)
> at java.net.Socket.setSoTimeout(Socket.java:1105)
> at
> org.apache.http.conn.scheme.PlainSocketFactory.connectSocket(PlainSocketFactory.java:116)
> at
> org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:177)
> at
> org.apache.http.impl.conn.AbstractPoolEntry.open(AbstractPoolEntry.java:144)
> at
> org.apache.http.impl.conn.AbstractPooledConnAdapter.open(AbstractPooledConnAdapter.java:131)
> at
> org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:610)
> at
> org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:445)
> at
> org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863)
> at
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
> at
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:106)
> at
> org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
>
> Could you please help us to resolve these two issue?
> Regards,
> Abdul Hameed
> +919599216373


Re: Remote kafka - Connection refused.

2015-04-23 Thread Gwen Shapira
Perhaps this will help:
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whycan'tmyconsumers/producersconnecttothebrokers?

On Thu, Apr 23, 2015 at 3:24 PM, madhavan kumar
 wrote:
> Dear all,
> I am trying to connect my python consumer to a remote kafka server. But
> in kafka/conn.py#reinit, the socket call socket.create_connection throws
> "connection refused" error.
> to confirm, it is not a firewall issue, i tried connecting to other servers
> (in the same machine as kafka server) from my local m/c and the socket
> connection works. connection refused happens, only for kafka server.
> i am running kafka-0.8.2 and in which log file, can i see the stacktraces
> (or) more useful information for errors related to connection requests?
>
> thanks,
> saravana


Re: [KIP-DISCUSSION] KIP-22 Expose a Partitioner interface in the new producer

2015-04-23 Thread Jay Kreps
Hey Sriharsha,

Great, thanks!

For 4:

Yeah the use case for init and close is making use of any kind of metadata.
An example of this would be if you are trying to do range partitioning you
need to map lexicographic ranges to numeric partitions. You might do this
by adding a new property to the config such as
   partitioner.metadata=a:0, b:1, ..., z:26
Or likewise you might have a partitioner built using Java's digest
interface and the config would be the digest algorithm name.

Or you might need to maintain this dynamically and have the partitioner
fetch this list on initialization from some central store (zk or whatever).

The init method we should use is the Configurable interface, that will
automatically pass in the configuration given to the producer at
instantiation time.

I agree that these additional methods add a bit of complexity and often
aren't needed, but on the flip side it is often hard to use the interface
without them when you do need them.

5. Yeah that's how the current partitioner works, but that is just because
it is a non-public interface. It's not clear to me if the partitioner
should override the partition or not. We could either say:
a. The partitioner is the default policy and the partition field is a way
to override that on a per-record basis for cases where you need that or
where it is simpler. If this is our description then the partitioner should
only take effect if partition==null
b. The partition the user specifies is just a suggestion and the
partitioner can interpret or override that in whatever way they want.

I think (a) may actually make more sense. The reason is because otherwise
the behavior of the partition field in ProducerRecord will be very hard to
depend on as the effect it has will be totally dependent on the partitioner
that is set. Any correct partitioner will basically have to implement the
case where the partition is set and I think the only sensible thing then is
to use it as the partition (right?).

Dunno, what do you think...?

-Jay

On Thu, Apr 23, 2015 at 2:59 PM, Sriharsha Chintalapani <
harsh...@fastmail.fm> wrote:

> Hi Jay,
>  Sorry about the KIP formatting . I fixed those in the KIP.
>
> 2. We certainly need to add both the serialized and unserialized form for
> the key as both are useful.
>
> I added those to the interface.
>
> 3. Do we need to add the value? I suspect people will have uses for
> computing something off a few fields in the value to choose the partition.
> This would be useful in cases where the key was being used for log
> compaction purposes and did not contain the full information for computing
> the partition.
>
> added it as well.
>
> 4. This interface doesn't include either an init() or close() method. It
> should implement Closable and Configurable, right?
>
> I am not quite sure about having init() or close() for partitioner. Are we
> looking at partitioner using some external resources to initialize and
> close. If thats the case than init() should also take in some config as
> param, this can add more complexity.
>
>
> 5. What happens if the user both sets the partition id in the
> ProducerRecord and sets a partitioner? Does the partition id just get
> passed in to the partitioner (as sort of implied in this interface?). This
> is a bit weird since if you pass in the partition id you kind of expect it
> to get used, right? Or is it the case that if you specify a partition the
> partitioner isn't used at all (in which case no point in including
> partition in the Partitioner api).
>
> In current Producer Record partition id is getting passed to Partitioner.
> If a custom partitioner is not going to use that than thats up to their
> implementation  right. Similarly in our interface we’ve Value as another
> param this may or may not be used. Essentially its up to the Partitioner to
> disclose on what available information they are going to partition against.
>
> Thanks,
> Harsha
>
>
> On April 23, 2015 at 9:11:33 AM, Jay Kreps (jay.kr...@gmail.com) wrote:
>
> Hey Harsha,
>
> A few comments:
>
> Can you finish up the KIP there are some unfinished sentences and odd
> whitespace things going on.
>
> Here are the questions I think we should consider:
> 1. Do we need this at all given that we have the partition argument in
> ProducerRecord which gives full control? I think we do need it because
> this
> is a way to plug in a different partitioning strategy at run time and do
> it
> in a fairly transparent way.
> 2. We certainly need to add both the serialized and unserialized form for
> the key as both are useful.
> 3. Do we need to add the value? I suspect people will have uses for
> computing something off a few fields in the value to choose the partition.
> This would be useful in cases where the key was being used for log
> compaction purposes and did not contain the full information for computing
> the partition.
> 4. This interface doesn't include either an init() or close() method. It
> should implement Closable a

Remote kafka - Connection refused.

2015-04-23 Thread madhavan kumar
Dear all,
I am trying to connect my python consumer to a remote kafka server. But
in kafka/conn.py#reinit, the socket call socket.create_connection throws
"connection refused" error.
to confirm, it is not a firewall issue, i tried connecting to other servers
(in the same machine as kafka server) from my local m/c and the socket
connection works. connection refused happens, only for kafka server.
i am running kafka-0.8.2 and in which log file, can i see the stacktraces
(or) more useful information for errors related to connection requests?

thanks,
saravana


Re: [KIP-DISCUSSION] KIP-22 Expose a Partitioner interface in the new producer

2015-04-23 Thread Sriharsha Chintalapani
Hi Jay,
         Sorry about the KIP formatting . I fixed those in the KIP.

2. We certainly need to add both the serialized and unserialized form for 
the key as both are useful. 
I added those to the interface.

3. Do we need to add the value? I suspect people will have uses for 
computing something off a few fields in the value to choose the partition. 
This would be useful in cases where the key was being used for log 
compaction purposes and did not contain the full information for computing 
the partition. 
added it as well.

4. This interface doesn't include either an init() or close() method. It 
should implement Closable and Configurable, right? 
I am not quite sure about having init() or close() for partitioner. Are we 
looking at partitioner using some external resources to initialize and close. 
If thats the case than init() should also take in some config as param, this 
can add more complexity.



5. What happens if the user both sets the partition id in the 
ProducerRecord and sets a partitioner? Does the partition id just get 
passed in to the partitioner (as sort of implied in this interface?). This 
is a bit weird since if you pass in the partition id you kind of expect it 
to get used, right? Or is it the case that if you specify a partition the 
partitioner isn't used at all (in which case no point in including 
partition in the Partitioner api). 
In current Producer Record partition id is getting passed to Partitioner. If a 
custom partitioner is not going to use that than thats up to their 
implementation  right. Similarly in our interface we’ve Value as another param 
this may or may not be used. Essentially its up to the Partitioner to disclose 
on what available information they are going to partition against.


Thanks,
Harsha


On April 23, 2015 at 9:11:33 AM, Jay Kreps (jay.kr...@gmail.com) wrote:

Hey Harsha,  

A few comments:  

Can you finish up the KIP there are some unfinished sentences and odd  
whitespace things going on.  

Here are the questions I think we should consider:  
1. Do we need this at all given that we have the partition argument in  
ProducerRecord which gives full control? I think we do need it because this  
is a way to plug in a different partitioning strategy at run time and do it  
in a fairly transparent way.  
2. We certainly need to add both the serialized and unserialized form for  
the key as both are useful.  
3. Do we need to add the value? I suspect people will have uses for  
computing something off a few fields in the value to choose the partition.  
This would be useful in cases where the key was being used for log  
compaction purposes and did not contain the full information for computing  
the partition.  
4. This interface doesn't include either an init() or close() method. It  
should implement Closable and Configurable, right?  
5. What happens if the user both sets the partition id in the  
ProducerRecord and sets a partitioner? Does the partition id just get  
passed in to the partitioner (as sort of implied in this interface?). This  
is a bit weird since if you pass in the partition id you kind of expect it  
to get used, right? Or is it the case that if you specify a partition the  
partitioner isn't used at all (in which case no point in including  
partition in the Partitioner api).  

Cheers,  

-Jay  

On Thu, Apr 23, 2015 at 6:55 AM, Sriharsha Chintalapani   
wrote:  

> Hi,  
> Here is the KIP for adding a partitioner interface for producer.  
>  
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer
>   
> There is one open question about how interface should look like. Please  
> take a look and let me know if you prefer one way or the other.  
>  
> Thanks,  
> Harsha  
>  
>  


NoClassDefFoundError at producer end

2015-04-23 Thread abdul hameed pathan
Hi,

we are getting NoClassDefFoundError at our producer end. We are using only
syn producer. we have 3 data producer systems whose using same
kafka.javaapi.producer.Producer instance.Bellow is the maven dependency
  
   org.apache.kafka
   kafka_2.10
   0.8.2.1
 

Bellow is the stack trace of exception.

Exception in thread "ActiveMQ Session Task-1749"
java.lang.NoClassDefFoundError:
kafka/producer/async/DefaultEventHandler$$anonfun$dispatchSerializedData$3
at
kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:99)
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
at kafka.producer.Producer.send(Producer.scala:77)
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
at
com.snapdeal.services.requeue.impl.RequeueServiceImpl.producerSend(RequeueServiceImpl.java:107)
at
com.snapdeal.services.requeue.impl.RequeueServiceImpl.requeueToKafka(RequeueServiceImpl.java:86)
at
com.snapdeal.services.requeue.impl.RequeueServiceImpl.requeue(RequeueServiceImpl.java:71)
at
com.snapdeal.services.indexer.impl.IndexerCatalogServiceImpl.indexCatalogInfo(IndexerCatalogServiceImpl.java:344)
at
com.snapdeal.services.indexer.impl.SupcIndexerServiceImpl.index(SupcIndexerServiceImpl.java:63)
at
com.snapdeal.indexer.listener.impl.InventoryUpdateQueueListener.index(InventoryUpdateQueueListener.java:73)
at
com.snapdeal.indexer.listener.AbstractQueueListenerService.callIndex(AbstractQueueListenerService.java:82)
at
com.snapdeal.indexer.listener.impl.InventoryUpdateQueueListener.onMessage(InventoryUpdateQueueListener.java:92)
at
org.apache.activemq.ActiveMQMessageConsumer.dispatch(ActiveMQMessageConsumer.java:1298)
at
org.apache.activemq.ActiveMQSessionExecutor.dispatch(ActiveMQSessionExecutor.java:131)
at
org.apache.activemq.ActiveMQSessionExecutor.iterate(ActiveMQSessionExecutor.java:202)
at
org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:122)
at
org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:43)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)




00:00:15,525#
[requestId=142967906333580768|appIdent=SearchAdmin|appIP=30.0.0.165|apiVariantId=null]
ERROR [HttpSender] - Error http to : http:
//
internal-catalog-lb-mw-2086838461.ap-southeast-1.elb.amazonaws.com:8080/service/product/getPOGDetailListByIdList
java.net.SocketException: Too many open files
at java.net.Socket.createImpl(Socket.java:447)
at java.net.Socket.getImpl(Socket.java:510)
at java.net.Socket.setSoTimeout(Socket.java:1105)
at
org.apache.http.conn.scheme.PlainSocketFactory.connectSocket(PlainSocketFactory.java:116)
at
org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:177)
at
org.apache.http.impl.conn.AbstractPoolEntry.open(AbstractPoolEntry.java:144)
at
org.apache.http.impl.conn.AbstractPooledConnAdapter.open(AbstractPooledConnAdapter.java:131)
at
org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:610)
at
org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:445)
at
org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863)
at
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
at
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:106)
at
org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)

Could you please help us to resolve these two issue?
Regards,
Abdul Hameed
+919599216373


kafka user group in los angeles

2015-04-23 Thread Alex Toth
Hi,
Sorry this isn't directly a kafka question, but I was wondering if there are 
andy Kafka user groups in (or in near driving range of) Los Angeles.  Looking 
through meetup.com and the usual web search engines hasn't brought me much 
outside of the LA Hadoop user group and I was hoping for something more 
specific.
If I should have asked this somewhere else, again, sorry and let me know.


  alex


Kafka client - 0.9

2015-04-23 Thread Bharath Srinivasan
Hi,

I'm looking for the 0.9 client release plan.

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design

Is there a planned date for this release?

Thanks,
Bharath


Re: Horizontal scaling a topic

2015-04-23 Thread Yury Ruchin
For Kafka 0.8.x[.x], refer to
https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-6.ReassignPartitionsTool
.

2015-04-23 23:20 GMT+03:00 Corey Nolet :

> I have a cluster of 3 nodes and I've created a topic with some number of
> partitions and some number of replicas, let's say 10 and 2, respectively.
> Later, after I've got my 3 nodes fairly consumed with data in the 10
> partitions, I want to add 2 more nodes to the mix to help balance out the
> partitions/replicas of my topic across 5 physical nodes instead of just 3.
>
> I was assuming Kafka would just notice the new node and auto-replicate
> partitions to it but research is telling me that this probably isn't the
> case. Let's say I want no data loss and I want Kafka to spread my 10
> partitions across all 5 nodes. How would I do this currently?
>


Horizontal scaling a topic

2015-04-23 Thread Corey Nolet
I have a cluster of 3 nodes and I've created a topic with some number of
partitions and some number of replicas, let's say 10 and 2, respectively.
Later, after I've got my 3 nodes fairly consumed with data in the 10
partitions, I want to add 2 more nodes to the mix to help balance out the
partitions/replicas of my topic across 5 physical nodes instead of just 3.

I was assuming Kafka would just notice the new node and auto-replicate
partitions to it but research is telling me that this probably isn't the
case. Let's say I want no data loss and I want Kafka to spread my 10
partitions across all 5 nodes. How would I do this currently?


Re: Fetch Request Purgatory and Mirrormaker

2015-04-23 Thread Evan Huus
This is still occurring for us. In addition, it has started occurring on
one of the six nodes in the "healthy" cluster, for no reason we have been
able to determine.

We're willing to put in some serious time to help debug/solve this, but we
need *some* hint as to where to start. I understand that purgatory has been
rewritten (again) in 0.8.3, so might it be worth trying a trunk build? Is
there an ETA for a beta release of 0.8.3?

Thanks,
Evan

On Tue, Apr 14, 2015 at 8:40 PM, Evan Huus  wrote:

> On Tue, Apr 14, 2015 at 8:31 PM, Jiangjie Qin 
> wrote:
>
>> Hey Evan,
>>
>> Is this issue only observed when mirror maker is consuming? It looks that
>> for Cluster A you have some other consumers.
>> Do you mean if you stop mirror maker the problem goes away?
>>
>
> Yes, exactly. The setup is A -> Mirrormaker -> B so mirrormaker is
> consuming from A and producing to B.
>
> Cluster A is always fine. Cluster B is fine when mirrormaker is stopped.
> Cluster B has the weird purgatory issue when mirrormaker is running.
>
> Today I rolled out a change to reduce the
> `fetch.purgatory.purge.interval.requests` and
> `producer.purgatory.purge.interval.requests` configuration values on
> cluster B from 1000 to 200, but it had no effect, which I find really weird.
>
> Thanks,
> Evan
>
>
>> Jiangjie (Becket) Qin
>>
>> On 4/14/15, 6:55 AM, "Evan Huus"  wrote:
>>
>> >Any ideas on this? It's still occurring...
>> >
>> >Is there a separate mailing list or project for mirrormaker that I could
>> >ask?
>> >
>> >Thanks,
>> >Evan
>> >
>> >On Thu, Apr 9, 2015 at 4:36 PM, Evan Huus  wrote:
>> >
>> >> Hey Folks, we're running into an odd issue with mirrormaker and the
>> >>fetch
>> >> request purgatory on the brokers. Our setup consists of two six-node
>> >> clusters (all running 0.8.2.1 on identical hw with the same config).
>> All
>> >> "normal" producing and consuming happens on cluster A. Mirrormaker has
>> >>been
>> >> set up to copy all topics (except a tiny blacklist) from cluster A to
>> >> cluster B.
>> >>
>> >> Cluster A is completely healthy at the moment. Cluster B is not, which
>> >>is
>> >> very odd since it is literally handling the exact same traffic.
>> >>
>> >> The graph for Fetch Request Purgatory Size looks like this:
>> >>
>> >>
>> https://www.dropbox.com/s/k87wyhzo40h8gnk/Screenshot%202015-04-09%2016.08
>> >>.37.png?dl=0
>> >>
>> >> Every time the purgatory shrinks, the latency from that causes one or
>> >>more
>> >> nodes to drop their leadership (it quickly recovers). We could probably
>> >> alleviate the symptoms by decreasing
>> >> `fetch.purgatory.purge.interval.requests` (it is currently at the
>> >>default
>> >> value) but I'd rather try and understand/solve the root cause here.
>> >>
>> >> Cluster B is handling no outside fetch requests, and turning
>> mirrormaker
>> >> off "fixes" the problem, so clearly (since mirrormaker is producing to
>> >>this
>> >> cluster not consuming from it) the fetch requests must be coming from
>> >> internal replication. However, the same data is being replicated when
>> >>it is
>> >> originally produced in cluster A, and the fetch purgatory size sits
>> >>stably
>> >> at ~10k there. There is nothing unusual in the logs on either cluster.
>> >>
>> >> I have read all the wiki pages and jira tickets I can find about the
>> new
>> >> purgatory design in 0.8.2 but nothing stands out as applicable. I'm
>> >>happy
>> >> to provide more detailed logs, configuration, etc. if anyone thinks
>> >>there
>> >> might be something important in there. I am completely baffled as to
>> >>what
>> >> could be causing this.
>> >>
>> >> Any suggestions would be appreciated. I'm starting to think at this
>> >>point
>> >> that we've completely misunderstood or misconfigured *something*.
>> >>
>> >> Thanks,
>> >> Evan
>> >>
>>
>>
>


Re: How to set console consumer group ID

2015-04-23 Thread Lukáš Havrlant
Thank you, Raja!

2015-04-23 0:34 GMT+02:00 Rajasekar Elango :

> Yes, you pass any consumer property including group.id by having them in
> property file and passing path to it using --consumer.config of consumer
> consumer.
>
> Thanks,
> Raja.
>
> On Wed, Apr 22, 2015 at 1:45 AM, Lukáš Havrlant  wrote:
>
> > Hi,
> > is it possible to set group ID for console consumer on command line?
> > Something like
> >
> > $ bin/kafka-console-consumer.sh --groupid myGroupId
> >
> > Lukáš
> >
>
>
>
> --
> Thanks,
> Raja.
>


Re: [KIP-DISCUSSION] KIP-22 Expose a Partitioner interface in the new producer

2015-04-23 Thread Jay Kreps
Hey Harsha,

A few comments:

Can you finish up the KIP there are some unfinished sentences and odd
whitespace things going on.

Here are the questions I think we should consider:
1. Do we need this at all given that we have the partition argument in
ProducerRecord which gives full control? I think we do need it because this
is a way to plug in a different partitioning strategy at run time and do it
in a fairly transparent way.
2. We certainly need to add both the serialized and unserialized form for
the key as both are useful.
3. Do we need to add the value? I suspect people will have uses for
computing something off a few fields in the value to choose the partition.
This would be useful in cases where the key was being used for log
compaction purposes and did not contain the full information for computing
the partition.
4. This interface doesn't include either an init() or close() method. It
should implement Closable and Configurable, right?
5. What happens if the user both sets the partition id in the
ProducerRecord and sets a partitioner? Does the partition id just get
passed in to the partitioner (as sort of implied in this interface?). This
is a bit weird since if you pass in the partition id you kind of expect it
to get used, right? Or is it the case that if you specify a partition the
partitioner isn't used at all (in which case no point in including
partition in the Partitioner api).

Cheers,

-Jay

On Thu, Apr 23, 2015 at 6:55 AM, Sriharsha Chintalapani 
wrote:

> Hi,
> Here is the KIP for adding a partitioner interface for producer.
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer
> There is one open question about how interface should look like. Please
> take a look and let me know if you prefer one way or the other.
>
> Thanks,
> Harsha
>
>


[ANN] Bottled Water: PostgreSQL to Kafka replication

2015-04-23 Thread Martin Kleppmann
Hi Kafka users,

I'd like to announce a new open source project, called "Bottled Water", for 
getting data from PostgreSQL into Kafka:
http://blog.confluent.io/2015/04/23/bottled-water-real-time-integration-of-postgresql-and-kafka/
https://github.com/confluentinc/bottledwater-pg/

Bottled Water combines a consistent snapshot with logical replication, so you 
can get a full dump of your database in Kafka, plus low-latency messages 
whenever a row is inserted, updated or deleted in the database. This allows you 
to write Kafka consumers which maintain a copy of a database in a downstream 
system, such as a full-text search index, caches, data warehouse, HDFS, etc.

Bottled Water uses the new logical decoding feature in Postgres 9.4, and the 
log compaction feature in Kafka. Each table in Postgres becomes a topic in 
Kafka, and the table schema is automatically converted into an Avro schema for 
export.

It's an alpha release that is not yet fit for production use, but it's ready 
for experimentation. Feedback and contributions welcome!

Martin



[KIP-DISCUSSION] KIP-22 Expose a Partitioner interface in the new producer

2015-04-23 Thread Sriharsha Chintalapani
Hi,
Here is the KIP for adding a partitioner interface for producer.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer
There is one open question about how interface should look like. Please take a 
look and let me know if you prefer one way or the other.

Thanks,
Harsha