Re: [DISCUSSION] adding the serializer api back to the new java producer

2014-11-25 Thread Jonathan Weeks
+1 on this change — APIs are forever. As much as we’d love to see 0.8.2 release 
ASAP, it is important to get this right.

-JW

 On Nov 24, 2014, at 5:58 PM, Jun Rao jun...@gmail.com wrote:
 
 Hi, Everyone,
 
 I'd like to start a discussion on whether it makes sense to add the
 serializer api back to the new java producer. Currently, the new java
 producer takes a byte array for both the key and the value. While this api
 is simple, it pushes the serialization logic into the application. This
 makes it hard to reason about what type of data is being sent to Kafka and
 also makes it hard to share an implementation of the serializer. For
 example, to support Avro, the serialization logic could be quite involved
 since it might need to register the Avro schema in some remote registry and
 maintain a schema cache locally, etc. Without a serialization api, it's
 impossible to share such an implementation so that people can easily reuse.
 We sort of overlooked this implication during the initial discussion of the
 producer api.
 
 So, I'd like to propose an api change to the new producer by adding back
 the serializer api similar to what we had in the old producer. Specially,
 the proposed api changes are the following.
 
 First, we change KafkaProducer to take generic types K and V for the key
 and the value, respectively.
 
 public class KafkaProducerK,V implements ProducerK,V {
 
public FutureRecordMetadata send(ProducerRecordK,V record, Callback
 callback);
 
public FutureRecordMetadata send(ProducerRecordK,V record);
 }
 
 Second, we add two new configs, one for the key serializer and another for
 the value serializer. Both serializers will default to the byte array
 implementation.
 
 public class ProducerConfig extends AbstractConfig {
 
.define(KEY_SERIALIZER_CLASS_CONFIG, Type.CLASS,
 org.apache.kafka.clients.producer.ByteArraySerializer, Importance.HIGH,
 KEY_SERIALIZER_CLASS_DOC)
.define(VALUE_SERIALIZER_CLASS_CONFIG, Type.CLASS,
 org.apache.kafka.clients.producer.ByteArraySerializer, Importance.HIGH,
 VALUE_SERIALIZER_CLASS_DOC);
 }
 
 Both serializers will implement the following interface.
 
 public interface SerializerT extends Configurable {
public byte[] serialize(String topic, T data, boolean isKey);
 
public void close();
 }
 
 This is more or less the same as what's in the old producer. The slight
 differences are (1) the serializer now only requires a parameter-less
 constructor; (2) the serializer has a configure() and a close() method for
 initialization and cleanup, respectively; (3) the serialize() method
 additionally takes the topic and an isKey indicator, both of which are
 useful for things like schema registration.
 
 The detailed changes are included in KAFKA-1797. For completeness, I also
 made the corresponding changes for the new java consumer api as well.
 
 Note that the proposed api changes are incompatible with what's in the
 0.8.2 branch. However, if those api changes are beneficial, it's probably
 better to include them now in the 0.8.2 release, rather than later.
 
 I'd like to discuss mainly two things in this thread.
 1. Do people feel that the proposed api changes are reasonable?
 2. Are there any concerns of including the api changes in the 0.8.2 final
 release?
 
 Thanks,
 
 Jun



Re: How many partition can one single machine handle in Kafka?

2014-10-22 Thread Jonathan Weeks
There are various costs when a broker fails, including broker leader election 
for each partition, etc., as well as exposing possible issues for in-flight 
messages, and client rebalancing etc.

So even though replication provides partition redundancy, RAID 10 on each 
broker is usually a good tradeoff to prevent the typical most common cause of 
broker server failure (e.g. disk failure) as well, and overall smoother 
operation.

Best Regards,

-Jonathan


On Oct 22, 2014, at 11:01 AM, Gwen Shapira gshap...@cloudera.com wrote:

 RAID-10?
 Interesting choice for a system where the data is already replicated
 between nodes. Is it to avoid the cost of large replication over the
 network? how large are these disks?
 
 On Wed, Oct 22, 2014 at 10:00 AM, Todd Palino tpal...@gmail.com wrote:
 In fact there are many more than 4000 open files. Many of our brokers run
 with 28,000+ open files (regular file handles, not network connections). In
 our case, we're beefing up the disk performance as much as we can by
 running in a RAID-10 configuration with 14 disks.
 
 -Todd
 
 On Tue, Oct 21, 2014 at 7:58 PM, Xiaobin She xiaobin...@gmail.com wrote:
 
 Todd,
 
 Actually I'm wondering how kafka handle so much partition, with one
 partition there is at least one file on disk, and with 4000 partition,
 there will be at least 4000 files.
 
 When all these partitions have write request, how did Kafka make the write
 operation on the disk to be sequential (which is emphasized in the design
 document of Kafka) and make sure the disk access is effective?
 
 Thank you for your reply.
 
 xiaobinshe
 
 
 
 2014-10-22 5:10 GMT+08:00 Todd Palino tpal...@gmail.com:
 
 As far as the number of partitions a single broker can handle, we've set
 our cap at 4000 partitions (including replicas). Above that we've seen
 some
 performance and stability issues.
 
 -Todd
 
 On Tue, Oct 21, 2014 at 12:15 AM, Xiaobin She xiaobin...@gmail.com
 wrote:
 
 hello, everyone
 
 I'm new to kafka, I'm wondering what's the max num of partition can one
 siggle machine handle in Kafka?
 
 Is there an sugeest num?
 
 Thanks.
 
 xiaobinshe
 
 
 



Re: How many partition can one single machine handle in Kafka?

2014-10-22 Thread Jonathan Weeks
Neha, 

Do you mean RAID 10 or RAID 5 or 6? With RAID 5 or 6, recovery is definitely 
very painful, but less so with RAID 10.

We have been using the guidance here:

http://www.youtube.com/watch?v=19DvtEC0EbQ#t=190 (LinkedIn Site Reliability 
Engineers state they run RAID 10 on all Kafka clusters @34:40 or so)

Plus: https://cwiki.apache.org/confluence/display/KAFKA/Operations

LinkedIn
Hardware
We are using dual quad-core Intel Xeon machines with 24GB of memory. In general 
this should not matter too much, we only see pretty low CPU usage at peak even 
with GZIP compression enabled and a number of clients that don't batch 
requests. The memory is probably more than is needed for caching the active 
segments of the log.
The disk throughput is important. We have 8x7200 rpm SATA drives in a RAID 10 
array. In general this is the performance bottleneck, and more disks is more 
better. Depending on how you configure flush behavior you may or may not 
benefit from more expensive disks (if you flush often then higher RPM SAS 
drives may be better).
OS Settings
We use Linux. Ext4 is the filesystem and we run using software RAID 10. We 
haven't benchmarked filesystems so other filesystems may be superior.
We have added two tuning changes: (1) we upped the number of file descriptors 
since we have lots of topics and lots of connections, and (2) we upped the max 
socket buffer size to enable high-performance data transfer between data 
centers (described here).


Best Regards,

-Jonathan



On Oct 22, 2014, at 3:44 PM, Neha Narkhede neha.narkh...@gmail.com wrote:

 In my experience, RAID 10 doesn't really provide value in the presence of
 replication. When a disk fails, the RAID resync process is so I/O intensive
 that it renders the broker useless until it completes. When this happens,
 you actually have to take the broker out of rotation and move the leaders
 off of it to prevent it from serving requests in a degraded state. You
 might as well shutdown the broker, delete the broker's data and let it
 catch up from the leader.
 
 On Wed, Oct 22, 2014 at 11:20 AM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
 Makes sense. Thanks :)
 
 On Wed, Oct 22, 2014 at 11:10 AM, Jonathan Weeks
 jonathanbwe...@gmail.com wrote:
 There are various costs when a broker fails, including broker leader
 election for each partition, etc., as well as exposing possible issues for
 in-flight messages, and client rebalancing etc.
 
 So even though replication provides partition redundancy, RAID 10 on
 each broker is usually a good tradeoff to prevent the typical most common
 cause of broker server failure (e.g. disk failure) as well, and overall
 smoother operation.
 
 Best Regards,
 
 -Jonathan
 
 
 On Oct 22, 2014, at 11:01 AM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
 RAID-10?
 Interesting choice for a system where the data is already replicated
 between nodes. Is it to avoid the cost of large replication over the
 network? how large are these disks?
 
 On Wed, Oct 22, 2014 at 10:00 AM, Todd Palino tpal...@gmail.com
 wrote:
 In fact there are many more than 4000 open files. Many of our brokers
 run
 with 28,000+ open files (regular file handles, not network
 connections). In
 our case, we're beefing up the disk performance as much as we can by
 running in a RAID-10 configuration with 14 disks.
 
 -Todd
 
 On Tue, Oct 21, 2014 at 7:58 PM, Xiaobin She xiaobin...@gmail.com
 wrote:
 
 Todd,
 
 Actually I'm wondering how kafka handle so much partition, with one
 partition there is at least one file on disk, and with 4000 partition,
 there will be at least 4000 files.
 
 When all these partitions have write request, how did Kafka make the
 write
 operation on the disk to be sequential (which is emphasized in the
 design
 document of Kafka) and make sure the disk access is effective?
 
 Thank you for your reply.
 
 xiaobinshe
 
 
 
 2014-10-22 5:10 GMT+08:00 Todd Palino tpal...@gmail.com:
 
 As far as the number of partitions a single broker can handle, we've
 set
 our cap at 4000 partitions (including replicas). Above that we've
 seen
 some
 performance and stability issues.
 
 -Todd
 
 On Tue, Oct 21, 2014 at 12:15 AM, Xiaobin She xiaobin...@gmail.com
 wrote:
 
 hello, everyone
 
 I'm new to kafka, I'm wondering what's the max num of partition can
 one
 siggle machine handle in Kafka?
 
 Is there an sugeest num?
 
 Thanks.
 
 xiaobinshe
 
 
 
 
 



Re: How many partition can one single machine handle in Kafka?

2014-10-22 Thread Jonathan Weeks
I suppose it also is going to depend on:

a) How much spare I/O bandwidth the brokers have as well to support a rebuild 
while supporting ongoing requests. Our brokers have spare IO capacity.
b) How many brokers are in the cluster and what the replication factor is — 
e.g. if you have a larger cluster, it is easier to tolerate the loss of a 
single broker. We started with 3 brokers, so the loss of a single broker is 
quite significant — we would prefer possibly degraded performance to having a 
“down” broker.

I do understand that y’all both work at LinkedIn, my point is that all of the 
guidance to date (as recently as this summer) is that in production LinkedIn 
runs on RAID 10, so it is just a bit odd to hear a contrary recommendation, 
although I do understand that best practices are a moving, evolving target.

Best Regards,

-Jonathan


On Oct 22, 2014, at 4:05 PM, Todd Palino tpal...@gmail.com wrote:

 Yeah, Jonathan, I'm the LinkedIn SRE who said that :) And Neha, up until
 recently, sat 8 feet from my desk. The data from the wiki page is off a
 little bit as well (we're running 14 disks now, and 64 GB systems)
 
 So to hit the first questions, RAID 10 gives higher read performance, and
 also allows you to suffer a disk failure without having to drop the entire
 cluster. As Neha noted, you're going to take a hit on the rebuild, and
 because of ongoing traffic in the cluster it will be for a long time (we
 can easily take half a day to rebuild a disk). But you still get some
 benefit out of the RAID over just killing the data and letting it rebuild
 from the replica, because during that time the cluster is not under
 replicated, so you can suffer another failure. The more servers and disks
 you have, the more often disks are going to fail, not to mention other
 components. Both hardware and software. I like running on the safer side.
 
 That said, I'm not sure RAID 10 is the answer either. We're going to be
 doing some experimenting with other disk layouts shortly. We've inherited a
 lot of our architecture, and many things have changed in that time. We're
 probably going to test out RAID 5 and 6 to start with and see how much we
 lose from the parity calculations.
 
 -Todd
 
 
 On Wed, Oct 22, 2014 at 3:59 PM, Jonathan Weeks jonathanbwe...@gmail.com
 wrote:
 
 Neha,
 
 Do you mean RAID 10 or RAID 5 or 6? With RAID 5 or 6, recovery is
 definitely very painful, but less so with RAID 10.
 
 We have been using the guidance here:
 
 http://www.youtube.com/watch?v=19DvtEC0EbQ#t=190 (LinkedIn Site
 Reliability Engineers state they run RAID 10 on all Kafka clusters @34:40
 or so)
 
 Plus: https://cwiki.apache.org/confluence/display/KAFKA/Operations
 
 LinkedIn
 Hardware
 We are using dual quad-core Intel Xeon machines with 24GB of memory. In
 general this should not matter too much, we only see pretty low CPU usage
 at peak even with GZIP compression enabled and a number of clients that
 don't batch requests. The memory is probably more than is needed for
 caching the active segments of the log.
 The disk throughput is important. We have 8x7200 rpm SATA drives in a RAID
 10 array. In general this is the performance bottleneck, and more disks is
 more better. Depending on how you configure flush behavior you may or may
 not benefit from more expensive disks (if you flush often then higher RPM
 SAS drives may be better).
 OS Settings
 We use Linux. Ext4 is the filesystem and we run using software RAID 10. We
 haven't benchmarked filesystems so other filesystems may be superior.
 We have added two tuning changes: (1) we upped the number of file
 descriptors since we have lots of topics and lots of connections, and (2)
 we upped the max socket buffer size to enable high-performance data
 transfer between data centers (described here).
 
 
 Best Regards,
 
 -Jonathan
 
 
 
 On Oct 22, 2014, at 3:44 PM, Neha Narkhede neha.narkh...@gmail.com
 wrote:
 
 In my experience, RAID 10 doesn't really provide value in the presence of
 replication. When a disk fails, the RAID resync process is so I/O
 intensive
 that it renders the broker useless until it completes. When this happens,
 you actually have to take the broker out of rotation and move the leaders
 off of it to prevent it from serving requests in a degraded state. You
 might as well shutdown the broker, delete the broker's data and let it
 catch up from the leader.
 
 On Wed, Oct 22, 2014 at 11:20 AM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
 Makes sense. Thanks :)
 
 On Wed, Oct 22, 2014 at 11:10 AM, Jonathan Weeks
 jonathanbwe...@gmail.com wrote:
 There are various costs when a broker fails, including broker leader
 election for each partition, etc., as well as exposing possible issues
 for
 in-flight messages, and client rebalancing etc.
 
 So even though replication provides partition redundancy, RAID 10 on
 each broker is usually a good tradeoff to prevent the typical most
 common
 cause of broker server failure (e.g. disk failure) as well, and overall

Re: Create topic programmatically

2014-10-13 Thread Jonathan Weeks
Sure — take a look at the kafka unit tests as well as admin.AdminUtils , e.g.:

import kafka.admin.AdminUtils
   AdminUtils.createTopic(zkClient, topicNameString, 10, 1)

Best Regards,

-Jonathan

On Oct 13, 2014, at 9:58 AM, hsy...@gmail.com wrote:

 Hi guys,
 
 Besides TopicCommand, which I believe is not provided to create topic
 programmatically, is there any other way to automate creating topic in
 code? Thanks!
 
 Best,
 Siyuan



Re: [DISCUSS] 0.8.1.2 Release

2014-09-30 Thread Jonathan Weeks
I was one asking for 0.8.1.2 a few weeks back, when 0.8.2 was at least 6-8 
weeks out.

If we truly believe that 0.8.2 will go “golden” and stable in 2-3 weeks, I, for 
one, don’t need a 0.8.1.2, but it depends on the confidence in shipping 0.8.2 
soonish.

YMMV,

-Jonathan


On Sep 30, 2014, at 12:37 PM, Neha Narkhede neha.narkh...@gmail.com wrote:

 Can we discuss the need for 0.8.1.2? I'm wondering if it's related to the
 timeline of 0.8.2 in any way? For instance, if we can get 0.8.2 out in the
 next 2-3 weeks, do we still need to get 0.8.1.2 out or can people just
 upgrade to 0.8.2?
 
 On Tue, Sep 30, 2014 at 9:53 AM, Joe Stein joe.st...@stealth.ly wrote:
 
 Hi, I wanted to kick off a specific discussion on a 0.8.1.2 release.
 
 Here are the JIRAs I would like to propose to back port a patch (if not
 already done so) and apply them to the 0.8.1 branch for a 0.8.1.2 release
 
 https://issues.apache.org/jira/browse/KAFKA-1502 (source jar is empty)
 https://issues.apache.org/jira/browse/KAFKA-1419 (cross build for scala
 2.11)
 https://issues.apache.org/jira/browse/KAFKA-1382 (Update zkVersion on
 partition state update failures)
 https://issues.apache.org/jira/browse/KAFKA-1490 (remove gradlew initial
 setup output from source distribution)
 https://issues.apache.org/jira/browse/KAFKA-1645 (some more jars in our
 src
 release)
 
 If the community and committers can comment on the patches proposed that
 would be great. If I missed any bring them up or if you think any I have
 proposed shouldn't be int he release bring that up too please.
 
 Once we have consensus on this thread my thought was that I would apply and
 commit the agreed to tickets to the 0.8.1 branch. If any tickets don't
 apply of course a back port patch has to happen through our standard
 process (not worried about that we have some engineering cycles to
 contribute to making that happen). Once that is all done, I will build
 0.8.1.2 release artifacts and call a VOTE for RC1.
 
 /***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
 /
 



Re: Copying messages from a single partition topic to a multi-partition topic

2014-09-19 Thread Jonathan Weeks
I would look at writing a service that reads from your existing topic and 
writes to a new topic with (e.g. four) partitions.

You will also need to pay attention to the partitioning policy (or implement 
your own), as the default hashing in the current kafka version default can lead 
to poor distribution.

Best Regards,

-Jonathan

 
On Sep 19, 2014, at 8:57 AM, Dennis Haller dhal...@talemetry.com wrote:

 Hi,
 
 We have an interesting problem to solve due to a very large traffic volumes
 on particular topics. In our initial system configuration we had only one
 partition per topic, and in in a couple of topics we have built up huge
 backlogs of several million messages that our consumers are slowly
 processing.
 
 However, now that we have this constant backlog, we wish to repartition
 those topics into several partitions, and allow parallel consumers to run
 to handle the high message volume.
 
 If we simply repartition the topic, say from 1 to 4 partitions, the
 backlogged messages stay in partition 1, while partitions 2,3,4 only get
 newly arrived messages. To eat away the backlog, we need to redistribute
 the backlogged messages evenly among the 4 partitions.
 
 The tools I've seen do not allow me to rewrite or replay the existing
 backlogged messages from one partition into the same or another topic with
 several partitions.  - using kafka.tools.MirrorMaker does not allow me to
 move the data within the same zookeeper network, and
 - using kafka.tools.ReplayLogProducer does not write to multiple
 partitions. It seems that it will write only from a single partition to a
 single partition.
 
 Does anyone have any other way to solve this problem or a better way of
 using the kafka tools?
 
 Thanks
 Dennis



Re: High level consumer with separate zk

2014-09-10 Thread Jonathan Weeks
When 0.8.2 arrives in the near future, consumer offsets will be stored by the 
brokers, and thus that workload will not be impacting ZK.

Best Regards,

-Jonathan


On Sep 10, 2014, at 8:20 AM, Mike Marzo precisionarchery...@gmail.com wrote:

 Is it possible for the high level consumer to use a different zk cluster
 than the cluster that manages broker leader electivity?
 The high level consumer adds a lot of value but I don't like the idea that
 bad user code could pound the core zk and effectively hurt the kafka
 brokers
 mike marzo
 908 209-4484



Re: Use case

2014-09-05 Thread Jonathan Weeks
+1

Topic Deletion with 0.8.1.1 is extremely problematic, and coupled with the fact 
that rebalance/broker membership changes pay a cost per partition today, 
whereby excessive partitions extend downtime in the case of a failure; this 
means fewer topics (e.g. hundreds or thousands) is a best practice in the 
published version of kafka. 

There are also secondary impacts on topic count — e.g. useful operational tools 
such as: http://quantifind.com/KafkaOffsetMonitor/ start to become problematic 
in terms of UX with a massive number of topics.

Once topic deletion is a supported feature, the use-case outlined might be more 
tenable.

Best Regards,

-Jonathan

On Sep 5, 2014, at 4:20 AM, Sharninder sharnin...@gmail.com wrote:

 I'm not really sure about your exact use-case but I don't think having a
 topic per user is very efficient. Deleting topics in kafka, at the moment,
 isn't really straightforward. You should rethink your date pipeline a bit.
 
 Also, just because kafka has the ability to store messages for a certain
 time, don't think of it as a data store. Kafka is a streaming system, think
 of it as a fast queue that gives you the ability to move your pointer back.
 
 --
 Sharninder
 
 
 
 On Fri, Sep 5, 2014 at 4:27 PM, Aris Alexis aris.alexis@gmail.com
 wrote:
 
 Thanks for the reply. If I use it only for activity streams like twitter:
 
 I would want a topic for each #tag and a topic for each user and maybe
 foreach city. Would that be too many topics or it doesn't matter since most
 of them will be deleted in a specified interval.
 
 
 
 Best Regards,
 Aris Giachnis
 
 
 On Fri, Sep 5, 2014 at 6:57 AM, Sharninder sharnin...@gmail.com wrote:
 
 Since you want all chats and mail history persisted all the time, I
 personally wouldn't recommend kafka for your requirement. Kafka is more
 suitable as a streaming system where events expire after a certain time.
 Look at something more general purpose like hbase for persisting data
 indefinitely.
 
 So, for example all activity streams can go into kafka from where
 consumers
 will pick up messages to parse and put them to hbase or other clients.
 
 --
 Sharninder
 
 
 
 
 
 On Fri, Sep 5, 2014 at 12:05 AM, Aris Alexis snowboard...@gmail.com
 wrote:
 
 Hello,
 
 I am building a big web application that I want to be massively
 scalable
 (I
 am using cassandra and titan as a general db).
 
 I want to implement the following:
 
 real time web chat that is persisted so that user a in the future can
 recall his chat with user b,c,d much like facebook.
 mail like messages in the web application (not sure about this as it is
 somewhat covered by the first one)
 user activity streams
 users subscribing to topics for example florida/musicevents
 
 Could i use kafka for this? can you recommend another technology maybe?
 
 
 



Re: Trunk backwards compatibility (producer / consumer client questions)

2014-08-29 Thread Jonathan Weeks
Hi Jun,

Jay indicated that the new producer client on trunk is backwards compatible 
with 0.8.1.1 (see thread below) — can you elaborate?

Given the consumer re-write for 0.9, I can definitely see how that would break 
backwards compatibility, but Jay indicates that the producer on the trunk will 
work with older existing brokers...

Thanks,

-Jonathan

On Aug 29, 2014, at 10:32 AM, Jun Rao jun...@gmail.com wrote:

 The old clients with be compatible with the new broker. However, in order
 to use the new clients, you will need to upgrade to the new broker first.
 
 Thanks,
 
 Jun
 
 
 On Fri, Aug 29, 2014 at 10:09 AM, Jonathan Weeks jonathanbwe...@gmail.com
 wrote:
 
 Thanks, Jay. Follow-up questions:
 
 Some of our services will produce and consume. Is there consumer code on
 trunk that is backwards compatible with an existing 0.8.1.1 broker cluster?
 If not 0.8.1.1, will the consumer code on trunk work with a 0.8.2 broker
 cluster when 0.8.2 is released?
 
 (Our code is scala, BTW)
 
 Best Regards,
 
 -Jonathan
 
 
 On Aug 26, 2014, at 5:55 PM, Jay Kreps jay.kr...@gmail.com wrote:
 
 Also, Jonathan, to answer your question, the new producer on trunk is
 running in prod for some use cases at LinkedIn and can be used with
 any 0.8.x. version.
 
 -Jay
 
 On Tue, Aug 26, 2014 at 12:38 PM, Jonathan Weeks
 jonathanbwe...@gmail.com wrote:
 I am interested in this very topic as well. Also, can the trunk version
 of the producer be used with an existing 0.8.1.1 broker installation, or
 does one need to wait for 0.8.2 (at least)?
 
 Thanks,
 
 -Jonathan
 
 On Aug 26, 2014, at 12:35 PM, Ryan Persaud ryan_pers...@symantec.com
 wrote:
 
 Hello,
 
 I'm looking to insert log lines from log files into kafka, but I'm
 concerned with handling asynchronous send() failures.  Specifically, if
 some of the log lines fail to send, I want to be notified of the failure so
 that I can attempt to resend them.
 
 Based on previous threads on the mailing list (
 http://comments.gmane.org/gmane.comp.apache.kafka.user/1322), I know that
 the trunk version of kafka supports callbacks for dealing with failures.
 However, the callback function is not passed any metadata that can be used
 by the producer end to reference the original message.  Including the key
 of the message in the RecordMetadata seems like it would be really useful
 for recovery purposes.  Is anyone using the callback functionality to
 trigger resends of failed messages?  If so, how are they tying the
 callbacks to messages?  Is anyone using other methods for handling async
 errors/resending today?  I can’t imagine that I am the only one trying to
 do this.  I asked this question on the IRC channel today, and it sparked
 some discussion, but I wanted to hear from a wider audience.
 
 Thanks for the information,
 -Ryan
 
 
 
 



Re: Handling send failures with async producer

2014-08-26 Thread Jonathan Weeks
I am interested in this very topic as well. Also, can the trunk version of the 
producer be used with an existing 0.8.1.1 broker installation, or does one need 
to wait for 0.8.2 (at least)?

Thanks,

-Jonathan

On Aug 26, 2014, at 12:35 PM, Ryan Persaud ryan_pers...@symantec.com wrote:

 Hello,
 
 I'm looking to insert log lines from log files into kafka, but I'm concerned 
 with handling asynchronous send() failures.  Specifically, if some of the log 
 lines fail to send, I want to be notified of the failure so that I can 
 attempt to resend them.
 
 Based on previous threads on the mailing list 
 (http://comments.gmane.org/gmane.comp.apache.kafka.user/1322), I know that 
 the trunk version of kafka supports callbacks for dealing with failures.  
 However, the callback function is not passed any metadata that can be used by 
 the producer end to reference the original message.  Including the key of the 
 message in the RecordMetadata seems like it would be really useful for 
 recovery purposes.  Is anyone using the callback functionality to trigger 
 resends of failed messages?  If so, how are they tying the callbacks to 
 messages?  Is anyone using other methods for handling async errors/resending 
 today?  I can’t imagine that I am the only one trying to do this.  I asked 
 this question on the IRC channel today, and it sparked some discussion, but I 
 wanted to hear from a wider audience.
 
 Thanks for the information,
 -Ryan
 



Re: Kafka build for Scala 2.11

2014-08-22 Thread Jonathan Weeks
I hand-applied this patch https://reviews.apache.org/r/23895/diff/ to the kafka 
0.8.1.1 branch and was able to build successfully:

gradlew -PscalaVersion=2.11.2 
-PscalaCompileOptions.useAnt=false releaseTarGz -x signArchives

I am testing the jar now, and will let you know if I can run 
producers/consumers against a vanilla 0.8.1.1 broker cluster with it...

-Jonathan

On Aug 22, 2014, at 11:02 AM, Seshadri, Balaji balaji.sesha...@dish.com wrote:

 Hi Team,
 
 We are trying to compile 0.8.1.1 with Scala 2.11 and its giving me 
 compilation errors.
 
 Please let me know which patch should I apply from below JIRA.I tried with 
 latest one and it failed to apply.
 
 https://issues.apache.org/jira/browse/KAFKA-1419
 
 Thanks,
 
 Balaji



Re: Kafka build for Scala 2.11

2014-08-22 Thread Jonathan Weeks
+1 on a 0.8.1.2 release with support for Scala 2.11.x.

-Jonathan


On Aug 22, 2014, at 11:19 AM, Joe Stein joe.st...@stealth.ly wrote:

 The changes are committed to trunk.  We didn't create the patch for 0.8.1.1
 since there were code changes required and we dropped support for Scala 2.8
 ( so we could just upload the artificats without a vote )
 
 https://issues.apache.org/jira/secure/attachment/12660369/KAFKA-1419_2014-08-07_10%3A52%3A18.patch
 is the version you want.
 
 If this is pressing for folks and can't wait for 0.8.2 or don't want to
 upgrade right away then doing a 0.8.1.2 release is an option...maybe some
 other things too...i.e. empty source jars.   I would prepare and vote on it
 if others would too.
 
 /***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop
 /
 On Aug 22, 2014 1:03 PM, Seshadri, Balaji balaji.sesha...@dish.com
 wrote:
 
 Hi Team,
 
 We are trying to compile 0.8.1.1 with Scala 2.11 and its giving me
 compilation errors.
 
 Please let me know which patch should I apply from below JIRA.I tried with
 latest one and it failed to apply.
 
 https://issues.apache.org/jira/browse/KAFKA-1419
 
 Thanks,
 
 Balaji
 



Re: consumer read from specific partition

2014-08-18 Thread Jonathan Weeks
One tactic that might be worth exploring is to rely on the message key to 
facilitate this.

It would require engineering careful functions for the key which hashes to the 
partitions for your topic(s). It would also mean that your consumers for the 
topic would be evaluating the key and discarding messages that aren’t relevant.

The only other option I can think of if you are using the high-level API would 
be finer-grained topics.

Best Regards,

-Jonathan

On Aug 18, 2014, at 9:14 AM, Josh J joshjd...@gmail.com wrote:

 Is it possible to modify and use the high level consumer so that I can
 ignore processing certain partitions?
 
 
 On Mon, Aug 18, 2014 at 5:07 PM, Sharninder sharnin...@gmail.com wrote:
 
 On Mon, Aug 18, 2014 at 7:27 PM, Josh J joshjd...@gmail.com wrote:
 
 You can see an example of using the SimpleConsumer here
 
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
 
 
 Any suggestions on where in the code to modify the high level producer to
 support reading from specific partitions ?
 
 
 High level producer? I'm assuming you meant to write the high level
 consumer, in which case it isn't possible. The link above, which has an
 example for reading messages off a specific partition, is for the Simple
 consumer, which ironically, is more complex than the high level consumer.
 
 In short, if you have a usecase where you want to read from a specific
 partition, you will need to implement a simple consumer.
 
 --
 Sharninder
 
 Josh
 .
 
 On Thu, Aug 14, 2014 at 4:27 PM, Neha Narkhede neha.narkh...@gmail.com
 wrote:
 
 You can see an example of using the SimpleConsumer here
 
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
 
 .
 
 
 
 On Thu, Aug 14, 2014 at 3:23 AM, Sharninder sharnin...@gmail.com
 wrote:
 
 Implement the low level Simple Consumer.
 
 --
 Sharninder
 
 
 
 On Thu, Aug 14, 2014 at 2:16 PM, Josh J joshjd...@gmail.com wrote:
 
 Hi,
 
 Suppose I have N partitions. I would like to have X different
 consumer
 threads ( X  N) read from a specified set of partitions. How can I
 achieve
 this?
 
 Thanks,
 
 Josh
 
 
 
 
 



Re: Apache webserver access logs + Kafka producer

2014-08-05 Thread Jonathan Weeks
You can look at something like: 

https://github.com/harelba/tail2kafka

(although I don’t know what the effort would be to update it, as it doesn’t 
look like it has been updated in a couple years)

We are using flume to gather logs, and then sending them to a kafka cluster via 
a flume kafka sink — e.g..

https://github.com/thilinamb/flume-ng-kafka-sink

-Jonathan


On Aug 5, 2014, at 1:40 PM, mvs.s...@gmail.com wrote:

 Hi,
 
 I want to collect apache web server logs in real time and send it to Kafka
 server. Is there any existing Producer available to do this operation, If
 not can you please provide a way to implement it.
 
 Regards,
 Sree.



Updated Kafka Roadmap?

2014-08-01 Thread Jonathan Weeks
Howdy, 

I was wondering if it would be possible to update the release plan:

https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan

aligned with the feature roadmap:

https://cwiki.apache.org/confluence/display/KAFKA/Index

We have several active projects actively and planning to use Kafka, and any 
current guidance on the new releases related to ZK dependence, producer and 
consumer API/client timing would be very helpful. For example, is 0.8.2 
possible in August, or is September likely?

Also, any chance something like:

https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer

…might make it into 0.9?

Thanks!