Re: The idea of "composite key" to make log compaction more flexible - question / proposal

2017-10-05 Thread Jay Kreps
I think you can do this now by using a custom partitioner, no?

https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/Partitioner.html

-Jay

On Mon, Oct 2, 2017 at 6:29 AM Michal Michalski 
wrote:

> Hi,
>
> TL;DR: I'd love to be able to make log compaction more "granular" than just
> per-partition-key, so I was thinking about the concept of a "composite
> key", where partitioning logic is using one part of the key, while
> compaction uses the whole key - is this something desirable / doable /
> worth a KIP?
>
> Longer story / use case:
>
> I'm currently a member of a team working on a project that's using a bunch
> of applications to ingest data to the system (one "entity type" per app).
> Once ingested by each application, since the entities are referring to each
> other, they're all published to a single topic to ensure ordering for later
> processing stages. Because of the nature of the data, for a given set of
> entities related together, there's always a single "master" / parent"
> entity, which ID we're using as the partition key; to give an example:
> let's say you have "product" entity which can have things like "media",
> "reviews", "stocks" etc. associated with it - product ID will be the
> partition key for *all* these entities. However, with this approach we
> simply cannot use log compaction because having e.g. "product", "media" and
> "review" events, all with the same partition key "X", means that compaction
> process will at some point delete all but one of them, causing a data loss
> - only a single entity with key "X" will remain (and that's absolutely
> correct - Kafka doesn't "understand" what does the message contain).
>
> We were thinking about introducing something we internally called
> "composite key". The idea is to have a key that's not just a single String
> K, but a pair of Strings: (K1, K2). For specifying the partition that the
> message should be sent to, K1 would be used; however, for log compaction
> purposes, the whole (K1, K2) would be used instead. This way, referring to
> the example above, different entities "belonging" to the same "master
> entity" (product), could be published to that topic with composite keys:
> (productId, "product"), (productId, "media") and (productId, "review"), so
> they all end up in single partition (specified by K1, which is always:
> productId), but they won't get compacted together, because the K2 part is
> different for them, making the whole "composite key" (K1, K2) different. Of
> course K2 would be optional, so for someone who only needs the default
> behaviour nothing would change.
>
> Since I'm not a Kafka developer and I don't know its internals that well, I
> can't say if this idea is technically feasible or not, but I'd think it is
> - I'd be more afraid of the complexity around backwards compatibility etc.
> and potential performance implications of such change.
>
> I know that similar behaviour is achievable by using the producer API that
> allows explicitly specifying the partition ID (and the key), but I think
> it's a bit "clunky" (for each message, generate a key that this message
> should normally be using [productId] and somehow "map" that key into a
> partition X; then send that message to this partition X, *but* use the
> "compaction" key instead [productId, entity type] as the message key) and
> it's something that could be abstracted away from the user.
>
> Thoughts?
>
> Question to Kafka users: Is this something that anyone here would find
> useful? Is anyone here dealing with similar problem?
>
> Question to Kafka maintainers: Is this something that you could potentially
> consider a useful feature? Would it be worth a KIP? Is something like this
> (technically) doable at all?
>
> --
> Kind regards,
> Michał Michalski
>


Re: Event sourcing with Kafka and Kafka Streams. How to deal with atomicity

2017-07-21 Thread Jay Kreps
Hey Chris,

I heard a similar complaint from a few people. I am quite ignorant about
event sourcing and don't feel I understand the relationship fully but I am
interested in understanding a little better what you are saying.

I think we see the world this way:

   1. You store the log of primary events in Kafka
   2. You can create queryable materialized views/indexes derived off of
   these events in Kafka Streams, which I believe would include what in event
   sourcing is called aggregates.

If you change the logic by which aggregates are computed off the raw events
you would rerun the streams app that derived it to recompute the derived
state from the event log. Since this is Kafka this can be done in a
blue/green fashion where you keep the old version of the app running and
start a new version in parallel which recomputes the state from scratch,
then cut over to the newly derived version of your app. In other words the
state is part of the app (which may be split over many instances) not part
of some remote db shared by many versions of the app.

The two things that I have heard are missing:

   1. Your query indexes in the streams app are only eventually consistent
   with writes to the Kafka topic. There is no read-after-write consistency.
   2. You can't compute individual aggregates on their own. That is if you
   have an Order aggregate you can recompute the set of orders from scratch
   but you can't recompute just Order 12453.
   3. The query functionality in streams is quite limited.

Of these two I think (1) and (3) are real limitations for many use cases.

I am actually not sure if (2) is a problem. In general, I do think that if
you change your logic for deriving aggregates from events, the only way to
correctly regenerate your state is to recompute off the event log, right?
Doing this in a one-off way for just some entities may result in derived
state that doesn't match the code and input events you have in odd ways.
Anyhow not sure if that is what you are saying is missing but other people
have said that.

Does that match what you are saying? I actually am too ignorant of this
area and its terminology to fully understand what you mean by the three
examples you give.

-Jay


On Fri, Jul 21, 2017 at 6:51 AM, Chris Richardson  wrote:

> Hi,
>
> I like Kafka but I don't understand the claim that it can be used for Event
> Sourcing (here 
> and here )
>
> One part of the event sourcing is the ability to subscribe to events
> published by aggregates and clearly Kafka works well there.
>
> But the other part of Event Sourcing is "database" like functionality,
> which includes
>
>- findEventsByPrimaryKey() - needed to be able to reconstruct an
>aggregate from its events - the essence of event sourcing
>- Atomic updates -  for updating aggregates  - findEventsByPrimaryKey()
>- business logic - insertNewEvents()) in order to handle this kind of
>scenario.
>
> The approach we have taken is to implement event sourcing using a database
> and Kafka.
> For instance: see
> https://blog.eventuate.io/2016/10/06/eventuate-local-
> event-sourcing-and-cqrs-with-spring-boot-apache-kafka-and-mysql/
>
> Chris
>
> --
> Learn microservices - http://learnmicroservices.io
> Microservices application platform http://eventuate.io
>
>
> On Fri, Jul 21, 2017 at 12:25 AM, José Antonio Iñigo <
> joseantonio.in...@gmail.com> wrote:
>
> > Hi everybody,
> >
> > I have been struggling with this problem for quite a while now, resorting
> > to stackoverflow
> >  > sourcing-apache-kafka-kafka-streams-how-to-assure-atomicity-transa>
> > for some help with no success. I am hoping to that here I'll get a more
> > authoritative answer from experienced Kafka users.
> >
> > This is the summary of my problem:
> >
> > - I am developing an application based on Spring Boot Microservices for a
> > shopping domain.
> > - I want to use Event Sourcing, having Kafka to register the events and
> > Kafka Streams API stores to materialize the views.
> > - To simplify the scenario we'll consider only two domains: Orders and
> > Products.
> > - The conflicting part:
> >1) OrderService publishes an OrderPlaced event indicating a productId
> > and the quantity.
> >2) ProductService consumes the event and queries (with an interactive
> > query) its Kafka Streams Store (ProductsStore) to check the availability
> of
> > the product. If there is availabilty it publishes a ProductReserved event
> > with productId and quantity:
> >
> > if("OrderPlaced".equals(event.get("eventType"))){
> >
> > Order order = new Order();
> > order.setId((String)event.get("orderId"));
> > order.setProductId((Integer)(event.get("productId")));
> > order.setUid(event.get("uid").toString());
> >
> > // QUERY PRODUCTSTOCK TO CHECK AVAILABILITY

Re: Consumer throughput drop

2017-07-20 Thread Jay Kreps
I suspect this is on Linux right?

The way Linux works is it uses a percent of memory to buffer new writes, at
a certain point it thinks it has too much buffered data and it gives high
priority to writing that out. The good news about this is that the writes
are very linear, well layed out, and high-throughput. The problem is that
it leads to a bit of see-saw behavior.

Now obviously the drop in performance isn't wrong. When your disk is
writing data out it is doing work and obviously the read throughput will be
higher when you are just reading and not writing then when you are doing
both reading and writing simultaneously. So obviously you can't get the
no-writing performance when you are also writing (unless you add I/O
capacity).

But still these big see-saws in performance are not ideal. You'd rather
have more constant performance all the time rather than have linux bounce
back and forth from writing nothing and then frantically writing full bore.
Fortunately linux provides a set of pagecache tuning parameters that let
you control this a bit.

I think these docs cover some of the parameters:
https://access.redhat.com/documentation/en-US/Red_Hat_Enterprise_Linux/6/html/Performance_Tuning_Guide/s-memory-tunables.html

-Jay

On Thu, Jul 20, 2017 at 10:24 AM, Ovidiu-Cristian MARCU <
ovidiu-cristian.ma...@inria.fr> wrote:

> Hi guys,
>
> I’m relatively new to Kafka’s world. I have an issue I describe below,
> maybe you can help me understand this behaviour.
>
> I’m running a benchmark using the following setup: one producer sends data
> to a topic and concurrently one consumer pulls and writes it to another
> topic.
> Measuring the consumer throughput, I observe values around 500K records/s
> only until the system’s cache gets filled - from this moment the consumer
> throughout drops to ~200K (2.5 times lower).
> Looking at disk usage, I observe disk read I/O which corresponds to the
> moment the consumer throughout drops.
> After some time, I kill the producer and immediately I observe the
> consumer throughput goes up to initial values ~ 500K records/s.
>
> What can I do to avoid this throughput drop?
>
> Attached an image showing disk I/O and CPU usage. I have about 128GB RAM
> on that server which gets filled at time ~2300.
>
> Thanks,
> Ovidiu
>
>


Re: KIP-162: Enable topic deletion by default

2017-07-19 Thread Jay Kreps
+1

On Sat, May 27, 2017 at 11:04 AM, Gwen Shapira  wrote:

> Thanks Vahid,
>
> Do you mind if we leave the command-line out of scope for this?
>
> I can see why adding confirmations, options to bypass confirmations, etc
> would be an improvement. However, I've seen no complaints about the current
> behavior of the command-line and the KIP doesn't change it at all. So I'd
> rather address things separately.
>
> Gwen
>
> On Fri, May 26, 2017 at 8:10 PM Vahid S Hashemian <
> vahidhashem...@us.ibm.com>
> wrote:
>
> > Gwen, thanks for the KIP.
> > It looks good to me.
> >
> > Just a minor suggestion: It would be great if the command asks for a
> > confirmation (y/n) before deleting the topic (similar to how removing
> ACLs
> > works).
> >
> > Thanks.
> > --Vahid
> >
> >
> >
> > From:   Gwen Shapira 
> > To: "d...@kafka.apache.org" , Users
> > 
> > Date:   05/26/2017 07:04 AM
> > Subject:KIP-162: Enable topic deletion by default
> >
> >
> >
> > Hi Kafka developers, users and friends,
> >
> > I've added a KIP to improve our out-of-the-box usability a bit:
> > KIP-162: Enable topic deletion by default:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 162+-+Enable+topic+deletion+by+default
> >
> >
> > Pretty simple :) Discussion and feedback are welcome.
> >
> > Gwen
> >
> >
> >
> >
> >
>


Re: Kafka Streams vs Spark Streaming : reduce by window

2017-06-16 Thread Jay Kreps
I think the question is when do you actually *want* processing time
semantics? There are definitely times when its safe to assume the two are
close enough that a little lossiness doesn't matter much but it is pretty
hard to make assumptions about when the processing time is and has been
hard for us to think of a use case where its actually desirable.

I think mostly what we've seen is confusion about the core concepts:

   - stream -- immutable events that occur
   - tables (including windows) -- current state of the world

If the root problem is confusion adding knobs never makes it better. If the
root problem is we're missing important use cases that justify the
additional knobs then i think it's good to try to really understand them. I
think there could be use cases around systems that don't take updates,
example would be email, twitter, and some metrics stores.

One solution that would be less complexity inducing than allowing new
semantics, but might help with the use cases we need to collect, would be
to add a new operator in the DSL. Something like .freezeAfter(30,
TimeUnit.SECONDS) that collects all updates for a given window and both
emits and enforces a single output after 30 seconds after the advancement
of stream time and remembers that it is omitted, suppressing all further
output (so the output is actually a KStream). This might or might not
depend on wall clock time. Perhaps this is in fact what you are proposing?

-Jay



On Fri, Jun 16, 2017 at 2:38 AM, Michal Borowiecki <
michal.borowie...@openbet.com> wrote:

> I wonder if it's a frequent enough use case that Kafka Streams should
> consider providing this out of the box - this was asked for multiple times,
> right?
>
> Personally, I agree totally with the philosophy of "no final aggregation",
> as expressed by Eno's post, but IMO that is predicated totally on
> event-time semantics.
>
> If users want processing-time semantics then, as the docs already point
> out, there is no such thing as a late-arriving record - every record just
> falls in the currently open window(s), hence the notion of final
> aggregation makes perfect sense, from the usability point of view.
>
> The single abstraction of "stream time" proves leaky in some cases (e.g.
> for punctuate method - being addressed in KIP-138). Perhaps this is another
> case where processing-time semantics warrant explicit handling in the api -
> but of course, only if there's sufficient user demand for this.
>
> What I could imagine is a new type of time window (ProcessingTimeWindow?),
> that if used in an aggregation, the underlying processor would force the
> WallclockTimestampExtractor (KAFKA-4144 enables that) and would use the
> system-time punctuation (KIP-138) to send the final aggregation value once
> the window has expired and could be configured to not send intermediate
> updates while the window was open.
>
> Of course this is just a helper for the users, since they can implement it
> all themselves using the low-level API, as Matthias pointed out already.
> Just seems there's recurring interest in this.
>
> Again, this only makes sense for processing time semantics. For event-time
> semantics I find the arguments for "no final aggregation" totally
> convincing.
>
>
> Cheers,
>
> Michał
>
> On 16/06/17 00:08, Matthias J. Sax wrote:
>
> Hi Paolo,
>
> This SO question might help, 
> too:https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable
>
> For Streams, the basic model is based on "change" and we report updates
> to the "current" result immediately reducing latency to a minimum.
>
> Last, if you say it's going to fall into the next window, you won't get
> event time semantics but you fall back processing time semantics, that
> cannot provide exact results
>
> If you really want to trade-off correctness version getting (late)
> updates and want to use processing time semantics, you should configure
> WallclockTimestampExtractor and implement a "update deduplication"
> operator using table.toStream().transform(). You can attached a state to
> your transformer and store all update there (ie, newer update overwrite
> older updates). Punctuations allow you to emit "final" results for
> windows for which "window end time" passed.
>
>
> -Matthias
>
> On 6/15/17 9:21 AM, Paolo Patierno wrote:
>
> Hi Eno,
>
>
> regarding closing window I think that it's up to the streaming application. I 
> mean ...
>
> If I want something like I described, I know that a value outside my 5 
> seconds window will be taken into account for the next processing (in the 
> next 5 seconds). I don't think I'm losing a record, I am ware that this 
> record will fall in the next "processing" window. Btw I'll take a look at 
> your article ! Thanks !
>
>
> Paolo
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : 

Re: Kafka Streams vs Spark Streaming : reduce by window

2017-06-16 Thread Jay Kreps
I think the question is when do you actually *want* processing time
semantics? There are definitely times when its safe to assume the two are
close enough that a little lossiness doesn't matter much but it is pretty
hard to make assumptions about when the processing time is and has been
hard for us to think of a use case where its actually desirable.

I think mostly what we've seen is confusion about the core concepts:

   - stream -- immutable events that occur
   - tables (including windows) -- current state of the world

If the root problem is confusion adding knobs never makes it better. If the
root problem is we're missing important use cases that justify the
additional knobs then i think it's good to try to really understand them. I
think there could be use cases around systems that don't take updates,
example would be email, twitter, and some metrics stores.

One solution that would be less complexity inducing than allowing new
semantics, but might help with the use cases we need to collect, would be
to add a new operator in the DSL. Something like .freezeAfter(30,
TimeUnit.SECONDS) that collects all updates for a given window and both
emits and enforces a single output after 30 seconds after the advancement
of stream time and remembers that it is omitted, suppressing all further
output (so the output is actually a KStream). This might or might not
depend on wall clock time. Perhaps this is in fact what you are proposing?

-Jay



On Fri, Jun 16, 2017 at 2:38 AM, Michal Borowiecki <
michal.borowie...@openbet.com> wrote:

> I wonder if it's a frequent enough use case that Kafka Streams should
> consider providing this out of the box - this was asked for multiple times,
> right?
>
> Personally, I agree totally with the philosophy of "no final aggregation",
> as expressed by Eno's post, but IMO that is predicated totally on
> event-time semantics.
>
> If users want processing-time semantics then, as the docs already point
> out, there is no such thing as a late-arriving record - every record just
> falls in the currently open window(s), hence the notion of final
> aggregation makes perfect sense, from the usability point of view.
>
> The single abstraction of "stream time" proves leaky in some cases (e.g.
> for punctuate method - being addressed in KIP-138). Perhaps this is another
> case where processing-time semantics warrant explicit handling in the api -
> but of course, only if there's sufficient user demand for this.
>
> What I could imagine is a new type of time window (ProcessingTimeWindow?),
> that if used in an aggregation, the underlying processor would force the
> WallclockTimestampExtractor (KAFKA-4144 enables that) and would use the
> system-time punctuation (KIP-138) to send the final aggregation value once
> the window has expired and could be configured to not send intermediate
> updates while the window was open.
>
> Of course this is just a helper for the users, since they can implement it
> all themselves using the low-level API, as Matthias pointed out already.
> Just seems there's recurring interest in this.
>
> Again, this only makes sense for processing time semantics. For event-time
> semantics I find the arguments for "no final aggregation" totally
> convincing.
>
>
> Cheers,
>
> Michał
>
> On 16/06/17 00:08, Matthias J. Sax wrote:
>
> Hi Paolo,
>
> This SO question might help, 
> too:https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable
>
> For Streams, the basic model is based on "change" and we report updates
> to the "current" result immediately reducing latency to a minimum.
>
> Last, if you say it's going to fall into the next window, you won't get
> event time semantics but you fall back processing time semantics, that
> cannot provide exact results
>
> If you really want to trade-off correctness version getting (late)
> updates and want to use processing time semantics, you should configure
> WallclockTimestampExtractor and implement a "update deduplication"
> operator using table.toStream().transform(). You can attached a state to
> your transformer and store all update there (ie, newer update overwrite
> older updates). Punctuations allow you to emit "final" results for
> windows for which "window end time" passed.
>
>
> -Matthias
>
> On 6/15/17 9:21 AM, Paolo Patierno wrote:
>
> Hi Eno,
>
>
> regarding closing window I think that it's up to the streaming application. I 
> mean ...
>
> If I want something like I described, I know that a value outside my 5 
> seconds window will be taken into account for the next processing (in the 
> next 5 seconds). I don't think I'm losing a record, I am ware that this 
> record will fall in the next "processing" window. Btw I'll take a look at 
> your article ! Thanks !
>
>
> Paolo
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : 

Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-06-02 Thread Jay Kreps
;
>>>> we advocate to process such a dead letter queue then, e.g. how to allow
>>>>>>
>>>>> for
>>>>>
>>>>>> retries with backoff ("If the first record in the dead letter queue
>>>>>>
>>>>> fails
>>>
>>>> again, then try the second record for the time being and go back to the
>>>>>> first record at a later time").  Jay and Jan already alluded to
>>>>>>
>>>>> ordering
>>>
>>>> problems that will be caused by dead letter queues. As I said, retries
>>>>>> might be out of scope but perhaps the implications should be
>>>>>> considered
>>>>>>
>>>>> if
>>>>>
>>>>>> possible?
>>>>>>
>>>>>> Also, I wrote the text below before reaching the point in the
>>>>>>
>>>>> conversation
>>>>>
>>>>>> that this KIP's scope will be limited to exceptions in the category of
>>>>>> poison pills / deserialization errors.  But since Jay brought up user
>>>>>>
>>>>> code
>>>>>
>>>>>> errors again, I decided to include it again.
>>>>>>
>>>>>> snip
>>>>>> A meta comment: I am not sure about this split between the code for
>>>>>> the
>>>>>> happy path (e.g. map/filter/... in the DSL) from the failure path
>>>>>>
>>>>> (using
>>>
>>>> exception handlers).  In Scala, for example, we can do:
>>>>>>
>>>>>>   scala> val computation = scala.util.Try(1 / 0)
>>>>>>   computation: scala.util.Try[Int] =
>>>>>> Failure(java.lang.ArithmeticException: / by zero)
>>>>>>
>>>>>>   scala> computation.getOrElse(42)
>>>>>>   res2: Int = 42
>>>>>>
>>>>>> Another example with Scala's pattern matching, which is similar to
>>>>>> `KStream#branch()`:
>>>>>>
>>>>>>   computation match {
>>>>>> case scala.util.Success(x) => x * 5
>>>>>> case scala.util.Failure(_) => 42
>>>>>>   }
>>>>>>
>>>>>> (The above isn't the most idiomatic way to handle this in Scala, but
>>>>>>
>>>>> that's
>>>>>
>>>>>> not the point I'm trying to make here.)
>>>>>>
>>>>>> Hence the question I'm raising here is: Do we want to have an API
>>>>>> where
>>>>>>
>>>>> you
>>>>>
>>>>>> code "the happy path", and then have a different code path for
>>>>>> failures
>>>>>> (using exceptions and handlers);  or should we treat both Success and
>>>>>> Failure in the same way?
>>>>>>
>>>>>> I think the failure/exception handling approach (as proposed in this
>>>>>>
>>>>> KIP)
>>>
>>>> is well-suited for errors in the category of deserialization problems
>>>>>>
>>>>> aka
>>>
>>>> poison pills, partly because the (default) serdes are defined through
>>>>>> configuration (explicit serdes however are defined through API calls).
>>>>>>
>>>>>> However, I'm not yet convinced that the failure/exception handling
>>>>>>
>>>>> approach
>>>>>
>>>>>> is the best idea for user code exceptions, e.g. if you fail to guard
>>>>>> against NPE in your lambdas or divide a number by zero.
>>>>>>
>>>>>>   scala> val stream = Seq(1, 2, 3, 4, 5)
>>>>>>   stream: Seq[Int] = List(1, 2, 3, 4, 5)
>>>>>>
>>>>>>   // Here: Fallback to a sane default when encountering failed
>>>>>>
>>>>> records
>>>
>>>>   scala> stream.map(x => Try(1/(3 - x))).flatMap(t =>
>>>>>> Seq(t.getOrElse(42)))
>>>>>>   res19: Seq[Int] = List(0, 1, 42, -1, 0)
>>>>>>
>>>>>>   // Here: Skip over failed records
>>>>>>   scala> stream.map(x => Try(1/(3 - x))).collect{ case Success(s)
>>>>>>
>>>>> => s
>>>
>>>> }
>>>>>
>>>>>>   res20: Seq[Int] = List(0, 1, -1, 0)
>>>>>>
>>>>>> The above is more natural to me than using error handlers to define
>>>>>> how
>>>>>>
>>>>> to
>>>>>
>>>>>> deal with failed records (here, the value `3` causes an arithmetic
>>>>>> exception).  Again, it might help the KIP if we added an end-to-end
>>>>>>
>>>>> example
>>>>>
>>>>>> for such user code errors.
>>>>>> snip
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, May 30, 2017 at 9:24 AM, Jan Filipiak <
>>>>>>
>>>>> jan.filip...@trivago.com>
>>>
>>>> wrote:
>>>>>>
>>>>>> Hi Jay,
>>>>>>>
>>>>>>> Eno mentioned that he will narrow down the scope to only
>>>>>>>
>>>>>> ConsumerRecord
>>>
>>>> deserialisation.
>>>>>>>
>>>>>>> I am working with Database Changelogs only. I would really not like
>>>>>>> to
>>>>>>>
>>>>>> see
>>>>>
>>>>>> a dead letter queue or something
>>>>>>> similliar. how am I expected to get these back in order. Just grind
>>>>>>> to
>>>>>>> hold an call me on the weekend. I'll fix it
>>>>>>> then in a few minutes rather spend 2 weeks ordering dead letters.
>>>>>>>
>>>>>> (where
>>>
>>>> reprocessing might be even the faster fix)
>>>>>>>
>>>>>>> Best Jan
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 29.05.2017 20:23, Jay Kreps wrote:
>>>>>>>
>>>>>>>   - I think we should hold off on retries unless we have worked
>>>>>>>>
>>>>>>> out
>>>
>>>> the
>>>>>
>>>>>>   full usage pattern, people can always implement their own. I
>>>>>>>>
>>>>>>> think
>>>
>>>> the idea
>>>>>>>>   is that you send the message to some kind of dead letter queue
>>>>>>>>
>>>>>>> and
>>>
>>>> then
>>>>>>>>   replay these later. This obviously destroys all semantic
>>>>>>>>
>>>>>>> guarantees
>>>
>>>> we are
>>>>>>>>   working hard to provide right now, which may be okay.
>>>>>>>>
>>>>>>>>
>>>
>


Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-05-29 Thread Jay Kreps
Hey Eno,

I think this makes sense. I do think people who spend time running
production stream processing systems will, over time, end up strongly
preferring the current behavior of failing and fixing the root problem
rather than skipping, but we don't need to force this on people as long as
the default is to fail.

One thing I'm confused about is the scope of the proposal. I think the plan
is that this would cover all exceptions that occur whether in serializers
or ANY user code? Is that right? So if I do stream.map(x =>
x.header.timestamp) and that throws a NullPointerException, this would be
triggered? If so what I understand is that what is passed in to me is the
original consumer record, not the value x that produced the null pointer
exception? Is that right? If this understanding is correct then the
name RecordExceptionHandler should maybe be something like
ProcessingExceptionHandler since the exception isn't necessarily directly
tied to an input Record, right?

A couple of other comments:

   - It's important we maintain the original stack trace when we rethrow
   the exception (probably obvious, but thought I'd mention it)
   - As a matter of style I'd advocate for making a single
   DefaultExceptionHandler which logs the error and adding configs for this to
   control when (if ever) it fails. This will allow adding additional useful
   options in a way that can be combined (such as the dead letter thing,
   retries, etc). Basically the point is that these facilities aren't
   "either/or". Also you mention adding configs for these in the existing
   proposal, it'd be good to say what the configs are.
   - I think we should hold off on retries unless we have worked out the
   full usage pattern, people can always implement their own. I think the idea
   is that you send the message to some kind of dead letter queue and then
   replay these later. This obviously destroys all semantic guarantees we are
   working hard to provide right now, which may be okay.
   - I agree that the LogAndThresholdExceptionHandler is closest to what
   most people think they want. I think making the exception handler stateful
   is probably fine since this is inherently an approximate threshold. I do
   think this is a bit more complex then it sounds though since you'll
   obviously need to compute some kind of cheap running rate. Obviously the
   two failure modes you'd need to avoid are that 1/1 failures = 100% OR
   conversely that it runs successfully for one year and then fails 100% of
   the time but that isn't caught because of the excess prior history.

-Jay


On Thu, May 25, 2017 at 2:47 AM, Eno Thereska 
wrote:

> Hi there,
>
> I’ve added a KIP on improving exception handling in streams:
> KIP-161: streams record processing exception handlers.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 161%3A+streams+record+processing+exception+handlers <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161:+streams+record+
> processing+exception+handlers>
>
> Discussion and feedback is welcome, thank you.
> Eno


Re: Producer Async Issue

2017-05-28 Thread Jay Kreps
I think setting max.block.ms=0 does what you want.

-Jay

On Sat, May 27, 2017 at 12:40 PM, Abhimanyu Nagrath <
abhimanyunagr...@gmail.com> wrote:

> HI Hans,
>
> What exactly I meant by asynchronous is that when my Kafka broker is down
> and I am trying to produce the message . It is getting stuck till the
> configured max.block.ms and after that further code is executed. What I am
> looking for is that whether the broker is down or not it should not get
> stuck.
>
>
>
> Regards,
> Abhimanyu
>
> On Sat, May 27, 2017 at 10:30 PM, Hans Jespersen 
> wrote:
>
> > The producer is asynchronous (assuming you mean the Java Producer)
> >
> > https://kafka.apache.org/0102/javadoc/index.html?org/apache/
> > kafka/clients/producer/KafkaProducer.html
> >
> > -hans
> >
> > > On May 27, 2017, at 5:15 AM, Abhimanyu Nagrath <
> > abhimanyunagr...@gmail.com> wrote:
> > >
> > > Hi,
> > > I am using Kafka 0.10.2 single node cluster and I want to know
> > > Kafka producer completely asynchronous. So what configuration I need to
> > > change in order to make producer completely asynchronous.
> > >
> > >
> > >
> > > Regards,
> > > Abhimanyu
> >
>


Re: Kafka Streams Usage Patterns

2017-05-27 Thread Jay Kreps
This is great!

-Jay

On Sat, May 27, 2017 at 12:47 PM, Michal Borowiecki <
michal.borowie...@openbet.com> wrote:

> Hi all,
>
> I've updated the wiki page with a draft pattern for consecutively growing
> time-windowed aggregations which was discussed some time ago on this
> mailing list.
>
> I'm yet to add the part that cleans up the stores using punctuations. Stay
> tuned.
>
>
> On a somewhat similar subject, I've been working to implement the
> following requirements:
>
> * transaction sums per customer session (simple, just extract non-expired
> session-windowed aggregates from a SessionStore using interactive queries)
>
> * global transaction sums for all *currently active* customer sessions
>
> The second bit proved non-trivial, because session-windowed KTables (or
> any windowed KTables for that matter) don't notify downstream when a window
> expires. And I can't use punctuate until KIP-138 is implemented because
> stream time punctuation is no good in this case (records can stop coming),
> reliable system time punctuation would be needed.
>
> Below is how I implemented this, I'm yet to test it thoroughly.
>
> I wonder if anyone knows of an easier way of achieving the same.
>
> If so, I'm looking forward to suggestions. If not, I'll add that to the
> patterns wiki page too, in case someone else finds it useful.
>
>
> builder
>   .stream(/*key serde*/, /*transaction serde*/, "transaciton-topic")
>
>   .groupByKey(/*key serde*/, /*transaction serde*/)
>
>   .aggregate(
> () -> /*empty aggregate*/,
> aggregator(),
> merger(),
> SessionWindows.with(SESSION_TIMEOUT_MS).until(SESSION_TIMEOUT_MS*2),
> /* aggregate serde */,
> txPerCustomerSumStore() // this store can be queried for per customer 
> session data  )
>
>   .toStream()
>
>   .filter(((key, value) -> value != null)) // tombstones only come when a 
> session is merged into a bigger session, so ignore them
>// the below map/groupByKey/reduce operations are to only propagate 
> updates to the *latest* session per customer to downstream
>
>   .map((windowedCustomerId, agg) -> // this moves timestamp from the windowed 
> key into the value  
> // so that we can group by customerId only and reduce to the later value
> new KeyValue<>(
>   windowedCustomerId.key(), // just customerId  new WindowedAggsImpl( 
> // this is just like a tuple2 but with nicely named accessors: timestamp() 
> and aggs()
> windowedCustomerId.window().end(),
> agg
>   )
> )
>   )
>   .groupByKey( /*key serde*/, /*windowed aggs serde*/ ) // key is just 
> customerId  .reduce( // take later session value and ignore any older - 
> downstream only cares about *current* sessions(val, agg) -> 
> val.timestamp() > agg.timestamp() ? val : agg,
> 
> TimeWindows.of(SESSION_TIMEOUT_MS).advanceBy(SESSION_TIMOUT_DELAY_TOLERANCE_MS),
> "latest-session-windowed"  )
>
>   .groupBy((windowedCustomerId, timeAndAggs) -> // calculate totals with 
> maximum granularity, which is per-partitionnew KeyValue<>(
>   new Windowed<>(
> windowedCustomerId.key().hashCode() % PARTITION_COUNT_FOR_TOTALS,  // 
> KIP-159 would come in handy here, to access partition number instead
> windowedCustomerId.window() // will use this in the interactive 
> queries to pick the oldest not-yet-expired window
>   ),
>   timeAndAggs.aggs()
> ),
> new SessionKeySerde<>(Serdes.Integer()),/* aggregate serde */
>   )
>
>   .reduce(
> (val, agg) -> agg.add(val),
> (val, agg) -> agg.subtract(val),
> txTotalsStore() // this store can be queried to get totals per partition 
> for all active sessions  );
>
> builder.globalTable(
>   new SessionKeySerde<>(Serdes.Integer()),
>   /* aggregate serde */,
>   changelogTopicForStore(TRANSACTION_TOTALS), "totals");// this global table 
> puts per partition totals on every node, so that they can be easily summed 
> for global totals, picking the oldest not-yet-expired window
>
> TODO: put in StreamParitioners (with KTable.through variants added in
> KAFKA-5045) to avoid re-partitioning where I know it's unnecessary.
>
> The idea behind the % PARTITION_COUNT_FOR_TOTALS bit is that I want to
> first do summation with max parallelism and minimize the work needed
> downstream. So I calculate a per-partition sum first to limit the updates
> that the totals topic will receive and the summing work done by the
> interactive queries on the global store. Is this a good way of going about
> it?
>
> Thanks,
>
> Michał
>
> On 09/05/17 18:31, Matthias J. Sax wrote:
>
> Hi,
>
> I started a new Wiki page to collect some common usage patterns for
> Kafka Streams.
>
> Right now, it contains a quick example on "how to compute average". Hope
> we can collect more example like this!
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns
>
>
> -Matthias
>
>
>
> --
>  Michal 

Re: [VOTE] KIP-156 Add option "dry run" to Streams application reset tool

2017-05-09 Thread Jay Kreps
+1
On Tue, May 9, 2017 at 3:41 PM BigData dev  wrote:

> Hi, Everyone,
>
> Since this is a relatively simple change, I would like to start the voting
> process for KIP-156: Add option "dry run" to Streams application reset tool
>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=69410150
>
>
> The vote will run for a minimum of 72 hours.
>
>
> Thanks,
>
> Bharat
>


Re: [ANNOUNCE] New committer: Rajini Sivaram

2017-04-24 Thread Jay Kreps
Congrats Rajini!

On Mon, Apr 24, 2017 at 2:06 PM, Gwen Shapira  wrote:

> The PMC for Apache Kafka has invited Rajini Sivaram as a committer and we
> are pleased to announce that she has accepted!
>
> Rajini contributed 83 patches, 8 KIPs (all security and quota
> improvements) and a significant number of reviews. She is also on the
> conference committee for Kafka Summit, where she helped select content
> for our community event. Through her contributions she's shown good
> judgement, good coding skills, willingness to work with the community on
> finding the best
> solutions and very consistent follow through on her work.
>
> Thank you for your contributions, Rajini! Looking forward to many more :)
>
> Gwen, for the Apache Kafka PMC
>


Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-03-22 Thread Jay Kreps
 >>>>> -Matthias
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> On 3/14/17 3:36 AM, Michael Noll wrote:
> > > >>>>>> I see Jay's point, and I agree with much of it -- notably about
> > > being
> > > >>>>>> careful which concepts we do and do not expose, depending on
> which
> > > >> user
> > > >>>>>> group / user type is affected.  That said, I'm not sure yet
> > whether
> > > >> or
> > > >>>>> not
> > > >>>>>> we should get rid of "Topology" (or a similar term) in the DSL.
> > > >>>>>>
> > > >>>>>> For what it's worth, here's how related technologies define/name
> > > >> their
> > > >>>>>> "topologies" and "builders".  Note that, in all cases, it's
> about
> > > >>>>>> constructing a logical processing plan, which then is being
> > > >>> executed/run.
> > > >>>>>>
> > > >>>>>> - `Pipeline` (Google Dataflow/Apache Beam)
> > > >>>>>> - To add a source you first instantiate the Source (e.g.
> > > >>>>>> `TextIO.Read.from("gs://some/inputData.txt")`),
> > > >>>>>>   then attach it to your processing plan via
> > > >>>>> `Pipeline#apply()`.
> > > >>>>>>   This setup is a bit different to our DSL because in our
> DSL
> > > the
> > > >>>>>> builder does both, i.e.
> > > >>>>>>   instantiating + auto-attaching to itself.
> > > >>>>>> - To execute the processing plan you call
> > `Pipeline#execute()`.
> > > >>>>>> - `StreamingContext`` (Spark): This setup is similar to our DSL.
> > > >>>>>> - To add a source you call e.g.
> > > >>>>>> `StreamingContext#socketTextStream("localhost", )`.
> > > >>>>>> - To execute the processing plan you call
> > > >>>>> `StreamingContext#execute()`.
> > > >>>>>> - `StreamExecutionEnvironment` (Flink): This setup is similar to
> > our
> > > >>> DSL.
> > > >>>>>> - To add a source you call e.g.
> > > >>>>>> `StreamExecutionEnvironment#socketTextStream("localhost",
> )`.
> > > >>>>>> - To execute the processing plan you call
> > > >>>>>> `StreamExecutionEnvironment#execute()`.
> > > >>>>>> - `Graph`/`Flow` (Akka Streams), as a result of composing
> Sources
> > (~
> > > >>>>>> `KStreamBuilder.stream()`) and Sinks (~ `KStream#to()`)
> > > >>>>>>   into Flows, which are [Runnable]Graphs.
> > > >>>>>> - You instantiate a Source directly, and then compose the
> > Source
> > > >>> with
> > > >>>>>> Sinks to create a RunnableGraph:
> > > >>>>>>   see signature `Source#to[Mat2](sink: Graph[SinkShape[Out],
> > > >>> Mat2]):
> > > >>>>>> RunnableGraph[Mat]`.
> > > >>>>>> - To execute the processing plan you call `Flow#run()`.
> > > >>>>>>
> > > >>>>>> In our DSL, in comparison, we do:
> > > >>>>>>
> > > >>>>>> - `KStreamBuilder` (Kafka Streams API)
> > > >>>>>> - To add a source you call e.g.
> `KStreamBuilder#stream("input-
> > > >>>>> topic")`.
> > > >>>>>> - To execute the processing plan you create a `KafkaStreams`
> > > >>> instance
> > > >>>>>> from `KStreamBuilder`
> > > >>>>>>   (where the builder will instantiate the topology =
> > processing
> > > >>> plan
> > > >>>>> to
> > > >>>>>> be executed), and then
> > > >>>>>>   call `KafkaStreams#start()`.  Think of `KafkaStreams` as
> our
> > > >>>>> runner.
> > > >>>>>>

Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-03-13 Thread Jay Kreps
Hey Matthias,

Make sense, I'm more advocating for removing the word topology than any
particular new replacement.

-Jay

On Mon, Mar 13, 2017 at 12:30 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Jay,
>
> thanks for your feedback
>
> > What if instead we called it KStreamsBuilder?
>
> That's the current name and I personally think it's not the best one.
> The main reason why I don't like KStreamsBuilder is, that we have the
> concepts of KStreams and KTables, and the builder creates both. However,
> the name puts he focus on KStream and devalues KTable.
>
> I understand your argument, and I am personally open the remove the
> "Topology" part, and name it "StreamsBuilder". Not sure what others
> think about this.
>
>
> About Processor API: I like the idea in general, but I thinks it's out
> of scope for this KIP. KIP-120 has the focus on removing leaking
> internal APIs and do some cleanup how our API reflects some concepts.
>
> However, I added your idea to API discussion Wiki page and we take if
> from there:
> https://cwiki.apache.org/confluence/display/KAFKA/
> Kafka+Streams+Discussions
>
>
>
> -Matthias
>
>
> On 3/13/17 11:52 AM, Jay Kreps wrote:
> > Two things:
> >
> >1. This is a minor thing but the proposed new name for KStreamBuilder
> >is StreamsTopologyBuilder. I actually think we should not put
> topology in
> >the name as topology is not a concept you need to understand at the
> >kstreams layer right now. I'd think of three categories of concepts:
> (1)
> >concepts you need to understand to get going even for a simple
> example, (2)
> >concepts you need to understand to operate and debug a real
> production app,
> >(3) concepts we truly abstract and you don't need to ever understand.
> I
> >think in the kstream layer topologies are currently category (2), and
> this
> >is where they belong. By introducing the name in even the simplest
> example
> >it means the user has to go read about toplogies to really understand
> even
> >this simple snippet. What if instead we called it KStreamsBuilder?
> >2. For the processor api, I think this api is mostly not for end
> users.
> >However this are a couple cases where it might make sense to expose
> it. I
> >think users coming from Samza, or JMS's MessageListener (
> >https://docs.oracle.com/javaee/7/api/javax/jms/MessageListener.html)
> >understand a simple callback interface for message processing. In
> fact,
> >people often ask why Kafka's consumer doesn't provide such an
> interface.
> >I'd argue we do, it's KafkaStreams. The only issue is that the
> processor
> >API documentation is a bit scary for a person implementing this type
> of
> >api. My observation is that people using this style of API don't do a
> lot
> >of cross-message operations, then just do single message operations
> and use
> >a database for anything that spans messages. They also don't factor
> their
> >code into many MessageListeners and compose them, they just have one
> >listener that has the complete handling logic. Say I am a user who
> wants to
> >implement a single Processor in this style. Do we have an easy way to
> do
> >that today (either with the .transform/.process methods in kstreams
> or with
> >the topology apis)? Is there anything we can do in the way of trivial
> >helper code to make this better? Also, how can we explain that
> pattern to
> >people? I think currently we have pretty in-depth docs on our apis
> but I
> >suspect a person trying to figure out how to implement a simple
> callback
> >might get a bit lost trying to figure out how to wire it up. A simple
> five
> >line example in the docs would probably help a lot. Not sure if this
> is
> >best addressed in this KIP or is a side comment.
> >
> > Cheers,
> >
> > -Jay
> >
> > On Fri, Feb 3, 2017 at 3:33 PM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> Hi All,
> >>
> >> I did prepare a KIP to do some cleanup some of Kafka's Streaming API.
> >>
> >> Please have a look here:
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> 120%3A+Cleanup+Kafka+Streams+builder+API
> >>
> >> Looking forward to your feedback!
> >>
> >>
> >> -Matthias
> >>
> >>
> >
>
>


Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-03-13 Thread Jay Kreps
Two things:

   1. This is a minor thing but the proposed new name for KStreamBuilder
   is StreamsTopologyBuilder. I actually think we should not put topology in
   the name as topology is not a concept you need to understand at the
   kstreams layer right now. I'd think of three categories of concepts: (1)
   concepts you need to understand to get going even for a simple example, (2)
   concepts you need to understand to operate and debug a real production app,
   (3) concepts we truly abstract and you don't need to ever understand. I
   think in the kstream layer topologies are currently category (2), and this
   is where they belong. By introducing the name in even the simplest example
   it means the user has to go read about toplogies to really understand even
   this simple snippet. What if instead we called it KStreamsBuilder?
   2. For the processor api, I think this api is mostly not for end users.
   However this are a couple cases where it might make sense to expose it. I
   think users coming from Samza, or JMS's MessageListener (
   https://docs.oracle.com/javaee/7/api/javax/jms/MessageListener.html)
   understand a simple callback interface for message processing. In fact,
   people often ask why Kafka's consumer doesn't provide such an interface.
   I'd argue we do, it's KafkaStreams. The only issue is that the processor
   API documentation is a bit scary for a person implementing this type of
   api. My observation is that people using this style of API don't do a lot
   of cross-message operations, then just do single message operations and use
   a database for anything that spans messages. They also don't factor their
   code into many MessageListeners and compose them, they just have one
   listener that has the complete handling logic. Say I am a user who wants to
   implement a single Processor in this style. Do we have an easy way to do
   that today (either with the .transform/.process methods in kstreams or with
   the topology apis)? Is there anything we can do in the way of trivial
   helper code to make this better? Also, how can we explain that pattern to
   people? I think currently we have pretty in-depth docs on our apis but I
   suspect a person trying to figure out how to implement a simple callback
   might get a bit lost trying to figure out how to wire it up. A simple five
   line example in the docs would probably help a lot. Not sure if this is
   best addressed in this KIP or is a side comment.

Cheers,

-Jay

On Fri, Feb 3, 2017 at 3:33 PM, Matthias J. Sax 
wrote:

> Hi All,
>
> I did prepare a KIP to do some cleanup some of Kafka's Streaming API.
>
> Please have a look here:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 120%3A+Cleanup+Kafka+Streams+builder+API
>
> Looking forward to your feedback!
>
>
> -Matthias
>
>


Re: KIP-121 [VOTE]: Add KStream peek method

2017-02-14 Thread Jay Kreps
+1

Nice improvement.

-Jay

On Tue, Feb 14, 2017 at 1:22 PM, Steven Schlansker <
sschlans...@opentable.com> wrote:

> Hi, it looks like I have 2 of the 3 minimum votes, can a third voter
> please consider this KIP?
> Thanks.
>
> (PS - new revision on GitHub PR with hopefully the last round of
> improvements)
>
> > On Feb 8, 2017, at 9:06 PM, Matthias J. Sax 
> wrote:
> >
> > +1
> >
> > On 2/8/17 4:51 PM, Gwen Shapira wrote:
> >> +1 (binding)
> >>
> >> On Wed, Feb 8, 2017 at 4:45 PM, Steven Schlansker
> >>  wrote:
> >>> Hi everyone,
> >>>
> >>> Thank you for constructive feedback on KIP-121,
> KStream.peek(ForeachAction) ;
> >>> it seems like it is time to call a vote which I hope will pass easily
> :)
> >>>
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 121%3A+Add+KStream+peek+method
> >>>
> >>> I believe the PR attached is already in good shape to consider merging:
> >>>
> >>> https://github.com/apache/kafka/pull/2493
> >>>
> >>> Thanks!
> >>> Steven
> >>>
> >>
> >>
> >>
> >
>
>


Re: [ANNOUNCE] New committer: Grant Henke

2017-01-11 Thread Jay Kreps
Congrats Grant!

-Jay

On Wed, Jan 11, 2017 at 11:51 AM, Gwen Shapira  wrote:

> The PMC for Apache Kafka has invited Grant Henke to join as a
> committer and we are pleased to announce that he has accepted!
>
> Grant contributed 88 patches, 90 code reviews, countless great
> comments on discussions, a much-needed cleanup to our protocol and the
> on-going and critical work on the Admin protocol. Throughout this, he
> displayed great technical judgment, high-quality work and willingness
> to contribute where needed to make Apache Kafka awesome.
>
> Thank you for your contributions, Grant :)
>
> --
> Gwen Shapira
> Product Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter | blog
>


Re: [VOTE] KIP-106 - Default unclean.leader.election.enabled True => False

2017-01-11 Thread Jay Kreps
+1

On Wed, Jan 11, 2017 at 10:56 AM, Ben Stopford  wrote:

> Looks like there was a good consensus on the discuss thread for KIP-106 so
> lets move to a vote.
>
> Please chime in if you would like to change the default for
> unclean.leader.election.enabled from true to false.
>
> https://cwiki.apache.org/confluence/display/KAFKA/%
> 5BWIP%5D+KIP-106+-+Change+Default+unclean.leader.
> election.enabled+from+True+to+False
>
> B
>


Re: KafkaStreams StateStore as EventStore (Event Sourcing)

2016-12-16 Thread Jay Kreps
Good question! Here's my understanding.

The streams API has a config num.standby.replicas. If this value is set to
0, the default, then the local state will have to be recreated by
re-reading the relevant Kafka partition and replaying that into the state
store, and as you point out this will take time proportional to the amount
of data. If you set this value to something more than 0, then a "standby
task" will be kept on one of the other instances. This standby won't do any
processing it will just passively replicate the state changes of the
primary task; in the event of a failure this standby task will be able to
take over very quickly because it already has the full state pre-created.

So you have a choice of redundancy in either "time" (by replaying data) or
"space" (by storing multiple copies).

(Hopefully that's correct, I don't have the firmest grasp on how the
standby tasks work.)

-Jay

On Thu, Dec 15, 2016 at 6:10 PM, Anatoly Pulyaevskiy <
anatoly.pulyaevs...@gmail.com> wrote:

> Hi everyone,
>
> I've been reading a lot about new features in Kafka Streams and everything
> looks very promising. There is even an article on Kafka and Event Sourcing:
> https://www.confluent.io/blog/event-sourcing-cqrs-stream-
> processing-apache-kafka-whats-connection/
>
> There are a couple of things that I'm concerned about though. For Event
> Sourcing it is assumed that there is a way to fetch all events for a
> particular object and replay them in order to get "latest snapshot" of that
> object.
>
> It seems like (and the article says so) that StateStore in KafkaStreams can
> be used to achieve that.
>
> My first question is would it scale well for millions of objects?
> I understand that StateStore is backed by a compacted Kafka topic so in an
> event of failure KafkaStreams will recover to the latest state by reading
> all messages from that topic. But my suspicion is that for millions of
> objects this may take a while (it would need to read the whole partition
> for each object), is this a correct assumption?
>
> My second question is would it make more sense to use an external DB in
> such case or is there a "best practice" around implementing Event Sourcing
> and using Kafka's internal StateStore as EventStore?
>
> Thanks,
> Anatoly
>


Re: Struggling with Kafka Streams rebalances under load / in production

2016-12-12 Thread Jay Kreps
I think the most common cause of rebalancing is still GC that exceeds the
consumer liveness timeout you've configured. Might be worth enabling GC
logging in java and then checking the pause times. If they exceed the
timeout you have for liveness then you will detect that as a process
failure and rebalance.

-Jay

On Sun, Dec 11, 2016 at 11:39 AM, Robert Conrad 
wrote:

> Hi All,
>
> I have a relatively complex streaming application that seems to struggle
> terribly with rebalance issues while under load. Does anyone have any tips
> for investigating what is triggering these frequent rebalances or
> particular settings I could experiment with to try to eliminate them?
>
> Originally I thought it had to do with exceeding the heartbeat timeout with
> heavy work threads, but the 0.10.1 release solved that by adding the
> background
> heartbeat thread
>  62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread>.
> Now rebalance just seems to strike randomly and provide no insight into
> what triggered it (all nodes are happy, everything seems to be running
> smoothly).
>
> Any help or insight is greatly appreciated!
>
> Rob
>


Re: [VOTE] Add REST Server to Apache Kafka

2016-10-25 Thread Jay Kreps
-1

I think the REST server for Kafka that already exists is quite good and
getting contributions. Moving this into the core project doesn't solve a
problem that I see.

-Jay

On Tue, Oct 25, 2016 at 2:16 PM, Harsha Chintalapani 
wrote:

> Hi All,
>We are proposing to have a REST Server as part of  Apache Kafka
> to provide producer/consumer/admin APIs. We Strongly believe having
> REST server functionality with Apache Kafka will help a lot of users.
> Here is the KIP that Mani Kumar wrote
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 80:+Kafka+Rest+Server.
> There is a discussion thread in dev list that had differing opinions on
> whether to include REST server in Apache Kafka or not. You can read more
> about that in this thread
> http://mail-archives.apache.org/mod_mbox/kafka-dev/201610.mbox/%3CCAMVt_
> aymqeudm39znsxgktpdde46sowmqhsxop-+jmbcuv7...@mail.gmail.com%3E
>
>   This is a VOTE thread to check interest in the community for
> adding REST Server implementation in Apache Kafka.
>
> Thanks,
> Harsha
>


Re: KIP-33 Opt out from Time Based indexing

2016-08-22 Thread Jay Kreps
Can you describe the behavior you saw that you didn't like?

-Jay

On Mon, Aug 22, 2016 at 12:24 AM, Jan Filipiak 
wrote:

> Hello everyone,
>
> I stumbled across KIP-33 and the time based index, while briefly checking
> the wiki and commits, I fail to find a way to opt out.
> I saw it having quite some impact on when logs are rolled and was hoping
> not to have to deal with all of that. Is there a disable switch I
> overlooked?
>
> Does anybody have a good use case where the timebase index comes in handy?
> I made a custom console consumer for me,
> that can bisect a log based on time. Its just a quick probabilistic shot
> into the log but is sometimes quite useful for some debugging.
>
> Best Jan
>


Re: Question about bootstrap processing in KafkaStreams.

2016-06-28 Thread Jay Kreps
I think you may get this for free as Kafka Streams attempts to align
consumption across different topics/partitions by the timestamp in the
messages. So in a case where you are starting a job fresh and it has a
database changelog to consume and a event stream to consume, it will
attempt to keep the Ktable at the "time" the event stream is at. This is
only a heuristic, of course, since messages are necessarily strongly
ordered by time. I think this is likely mostly the same but slightly better
than the bootstrap usage in Samza but also covers other cases of alignment.

If you want more control you can override the timestamp extractor that
associates time and hence priority for the streams:
https://kafka.apache.org/0100/javadoc/org/apache/kafka/streams/processor/TimestampExtractor.html

-Jay

On Tue, Jun 28, 2016 at 2:49 PM, Rohit Valsakumar 
wrote:

> Hi all,
>
> Is there a way to consume all the contents of a kafka topic into a KTable
> before doing a left join with another Kstream?
>
> I am looking at something that simulates a bootstrap topic in a Samza job.
>
> Thanks,
> Rohit Valsakumar
>
> 
>
> This email and any attachments may contain confidential and privileged
> material for the sole use of the intended recipient. Any review, copying,
> or distribution of this email (or any attachments) by others is prohibited.
> If you are not the intended recipient, please contact the sender
> immediately and permanently delete this email and any attachments. No
> employee or agent of TiVo Inc. is authorized to conclude any binding
> agreement on behalf of TiVo Inc. by email. Binding agreements with TiVo
> Inc. may only be made by a signed written agreement.
>


Re: Kafka Streams backend for Apache Beam?

2016-06-27 Thread Jay Kreps
Even though I'm not aware of anyone working on it, I think we'd definitely
be open to it if someone wants to take a swing at it.

-Jay

On Sun, Jun 26, 2016 at 9:13 AM, Alex Glikson  wrote:

> Hi all,
>
> I wonder whether there are plans to implement Apache Beam backend based on
> Kafka Streams?
>
> Thanks,
> Alex
>
>
>


Re: delay of producer and consumer in kafka 0.9 is too big to be accepted

2016-06-25 Thread Jay Kreps
Can you sanity check this with the end-to-end latency test that ships with
Kafka in the tools package?

https://apache.googlesource.com/kafka/+/1769642bb779921267bd57d3d338591dbdf33842/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala

On Saturday, June 25, 2016, Kafka  wrote:

> Hi all,
> my kafka cluster is composed of three brokers with each have 8core
> cpu and 8g memory and 1g network card.
> with java async client,I sent 100 messages with size of 1024
> bytes per message ,the send gap between each sending is 20us,the consumer’s
> config is like this,fetch.min.bytes is set to 1, fetch.wait.max.ms is set
> to 100.
> to avoid the inconformity bewteen two machines,I start producer
> and consumer at the same machine,the machine’s configurations  have enough
> resources to satisfy these two clients.
>
> I start consumer before producer on each test,with the sending
> timestamp in each message,when consumer receive the message,then I can got
> the consumer delay through the substraction between current timesstamp and
> sending timestamp.
> when I set acks to 0,replica to 2,then the average producer delay
> is 2.98ms, the average consumer delay is 52.23ms.
> when I set acks to 1,replica to 2,then the average producer delay
> is 3.9ms,the average consumer delay is 44.88ms.
> when I set acks to -1, replica to 2, then the average producer
> delay is 1782ms, the average consumer delay is 1786ms.
>
> I have two doubts,the first is why my  consumer's delay with acks
> settled to 0  is logger than the consumer delay witch acks settled to 1.
> the second is why the delay of producer and consumer is so big when I set
> acks to -1,I think this delay is can not be accepted.
> and I found this delay is amplified with sending more messages.
>
> any feedback is appreciated.
> thanks
>
>
>
>


Re: [ANNOUCE] Apache Kafka 0.10.0.0 Released

2016-05-24 Thread Jay Kreps
Woohoo!!! :-)

-Jay

On Tue, May 24, 2016 at 9:24 AM, Gwen Shapira <gwens...@apache.org> wrote:

> The Apache Kafka community is pleased to announce the release for Apache
> Kafka 0.10.0.0.
> This is a major release with exciting new features, including first
> release of KafkaStreams and many other improvements.
>
> All of the changes in this release can be found:
>
> https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.0/RELEASE_NOTES.html
>
> Apache Kafka is high-throughput, publish-subscribe messaging system
> rethought of as a distributed commit log.
>
> ** Fast => A single Kafka broker can handle hundreds of megabytes of reads
> and
> writes per second from thousands of clients.
>
> ** Scalable => Kafka is designed to allow a single cluster to serve as the
> central data backbone
> for a large organization. It can be elastically and transparently expanded
> without downtime.
> Data streams are partitioned and spread over a cluster of machines to allow
> data streams
> larger than the capability of any single machine and to allow clusters of
> co-ordinated consumers.
>
> ** Durable => Messages are persisted on disk and replicated within the
> cluster to prevent
> data loss. Each broker can handle terabytes of messages without performance
> impact.
>
> ** Distributed by Design => Kafka has a modern cluster-centric design that
> offers
> strong durability and fault-tolerance guarantees.
>
> You can download the source release from
>
> https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.0/kafka-0.10.0.0-src.tgz
>
> and binary releases from
>
> https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz
>
> https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz
>
> A big thank you for the following people who have contributed to the
> 0.10.0.0 release.
>
> Adam Kunicki, Aditya Auradkar, Alex Loddengaard, Alex Sherwin, Allen
> Wang, Andrea Cosentino, Anna Povzner, Ashish Singh, Atul Soman, Ben
> Stopford, Bill Bejeck, BINLEI XUE, Chen Shangan, Chen Zhu, Christian
> Posta, Cory Kolbeck, Damian Guy, dan norwood, Dana Powers, David
> Jacot, Denise Fernandez, Dionysis Grigoropoulos, Dmitry Stratiychuk,
> Dong Lin, Dongjoon Hyun, Drausin Wulsin, Duncan Sands, Dustin Cote,
> Eamon Zhang, edoardo, Edward Ribeiro, Eno Thereska, Ewen
> Cheslack-Postava, Flavio Junqueira, Francois Visconte, Frank Scholten,
> Gabriel Zhang, gaob13, Geoff Anderson, glikson, Grant Henke, Greg
> Fodor, Guozhang Wang, Gwen Shapira, Igor Stepanov, Ishita Mandhan,
> Ismael Juma, Jaikiran Pai, Jakub Nowak, James Cheng, Jason Gustafson,
> Jay Kreps, Jeff Klukas, Jeremy Custenborder, Jesse Anderson, jholoman,
> Jiangjie Qin, Jin Xing, jinxing, Jonathan Bond, Jun Rao, Ján Koščo,
> Kaufman Ng, kenji yoshida, Kim Christensen, Kishore Senji, Konrad,
> Liquan Pei, Luciano Afranllie, Magnus Edenhill, Maksim Logvinenko,
> manasvigupta, Manikumar reddy O, Mark Grover, Matt Fluet, Matt
> McClure, Matthias J. Sax, Mayuresh Gharat, Micah Zoltu, Michael Blume,
> Michael G. Noll, Mickael Maison, Onur Karaman, ouyangliduo, Parth
> Brahmbhatt, Paul Cavallaro, Pierre-Yves Ritschard, Piotr Szwed,
> Praveen Devarao, Rafael Winterhalter, Rajini Sivaram, Randall Hauch,
> Richard Whaling, Ryan P, Samuel Julius Hecht, Sasaki Toru, Som Sahu,
> Sriharsha Chintalapani, Stig Rohde Døssing, Tao Xiao, Tom Crayford,
> Tom Dearman, Tom Graves, Tom Lee, Tomasz Nurkiewicz, Vahid Hashemian,
> William Thurston, Xin Wang, Yasuhiro Matsuda, Yifan Ying, Yuto
> Kawamura, zhuchen1018
>
> We welcome your help and feedback. For more information on how to
> report problems, and to get involved, visit the project website at
> http://kafka.apache.org/
>
> Thanks,
>
> Gwen
>


Re: Kafka Streams: KStream - KTable Left Join

2016-05-04 Thread Jay Kreps
Is it possible to make the error message give more an explanation?

-Jay

On Wed, May 4, 2016 at 8:46 AM, Matthias J. Sax 
wrote:

> Hi,
>
> I am still new to Kafka Streams by myself, but from my understanding if
> you change the key, your partitioning changes, ie, is not valid anymore.
> Thus, the joins (which assumes co-located data) cannot be performed
> (this is the reason why sources get set to null). You can write to an
> intermediate topic via .through(...) to get a valid partitioning:
>
> KStream dataStream = builder.stream(...).map(...).through(...);
>
> Afterward, your join should work.
>
> -Matthias
>
>
> On 05/04/2016 04:43 PM, Gaspar Muñoz wrote:
> > Hi there,
> >
> > I am not able to perform a Left Join between a KStream and KTable in
> Kafka
> > Streams.
> >
> > Exception in thread "main"
> > org.apache.kafka.streams.errors.TopologyBuilderException: Invalid
> topology
> > building: KSTREAM-FILTER-03 and KTABLE-SOURCE-05 are not
> > joinable
> > at
> >
> org.apache.kafka.streams.kstream.internals.AbstractStream.ensureJoinableWith(AbstractStream.java:44)
> > at
> >
> org.apache.kafka.streams.kstream.internals.KStreamImpl.leftJoin(KStreamImpl.java:383)
> >
> >
> > My code looks
> >
> > KStream dataStream = builder
> > .stream(stringDeserializer, stringDeserializer,
> > conf.getString(INPUT_TOPIC))
> > .map(App::keyByAap)
> >
> > KTable aapTable = builder.table("lookup_topic");
> >
> > KStream result = dataStream.leftJoin(aapTable, new
> > ValueJoinerAap());
> >
> >
> > My ValueJoiner simply concat two Strings.  I think the problem is in map
> > where I change the key in order to join by this field. In the internals
> of
> > KafkaStreams we can see  that map function set to null source nodes
> >
> > @Override
> > public  KStream map(KeyValueMapper > V1>> mapper) {
> > String name = topology.newName(MAP_NAME);
> >
> > topology.addProcessor(name, new KStreamMap<>(mapper), this.name);
> >
> > return new KStreamImpl<>(topology, name, null);
> > }
> >
> >
> > And after that, in the left join precondition, in function Set
> > allSourceNodes = ensureJoinableWith((AbstractStream) other); the code
> > checks if any source nodes is null and, obviously, the KStream which I
> > applied the map has the source to null so throw the exception.
> >
> > if (thisSourceNodes == null || otherSourceNodes == null)
> > throw new TopologyBuilderException(this.name + " and " +
> > other.name + " are not joinable");
> >
> >
> > The question is, how can I change the key without lose parent data in
> order
> > to perform a join with KTable after that?
> >
> > Thanks.
> >
>
>


Re: Kafka Connector for Solr

2016-04-22 Thread Jay Kreps
This is great!

-Jay

On Fri, Apr 22, 2016 at 2:28 PM, Surendra , Manchikanti <
surendra.manchika...@gmail.com> wrote:

> Hi,
>
> I have implemented KafkaConnector for Solr, Please find the below github
> link.
>
> https://github.com/msurendra/kafka-connect-solr
>
> The initial release having SolrSinkConnector Only, SolrSourceConnector
> under development will add it soon.
>
> Regards,
> Surendra M
>


Re: Kafka Connect concept question

2016-04-07 Thread Jay Kreps
Another way to think about this is that the producer allows you to PUSH
data into Kafka and the consumer allows you to PULL data out. This is what
you need to write an application.

However for an existing data system you need the opposite you need to PULL
data into Kafka from the system or PUSH it out of Kafka into the system.
Kafka Connect implements this. Technically you could implement this from
scratch but then you'd just be rebuilding what Connect itself does (as Ewen
said).

Not sure if that made things more or less clear :-)

-Jay

On Thu, Apr 7, 2016 at 1:41 PM, Ewen Cheslack-Postava 
wrote:

> On Wed, Apr 6, 2016 at 8:56 AM, Uber Slacker 
> wrote:
>
> > Hi folks. I'm pretty new to Kafka. I have spent a fair amount of time so
> > far understanding the Kafka system in general and how producers and
> > consumers work. I'm now trying to get a grasp on how Kafka Connect
> > compares/contrasts to Producers/Consumers written via the Java API.
> >
> > When might someone want to write their own Java Producer/Consumer versus
> > using a connector in Kafka Connect?  How does Kafka Connect use producers
> > and consumers behind the scenes? Why wouldn't we simply want a
> > producer/consumer library that contains producers and consumers written
> to
> > work with various external systems such as HDFS? Why this new framework?
> > Thanks for any clarification!
> >
>
> Internally Connect does use the producer and consumer. However, the
> framework adds a lot of support for functionality you want specifically
> when you are copying data from another system to Kafka or from Kafka to
> another system. Connect handles distribution and fault tolerance for you at
> the framework level. It provides a schema/data API and abstracts away
> details of serialization such that you can write a single connector and
> support multiple formats.
>
> If you're trying to copy data to/from another system, we'd generally
> recommend using the Connect framework since it adds all this extra support
> and allows you to focus only on how you get the data into/out of the other
> system. You'll want to use producers and consumers directly if you need
> more control that Connect hides from you, but then you'll need to create
> your own implementation of features Connect provides (or simply not support
> them).
>
> -Ewen
>


Re: Kafka connect to database to push consumed data

2016-04-01 Thread Jay Kreps
There isn't a jdbc sink yet, though that is actually a very useful and easy
connector to write. I think it can be pretty efficient as long as it uses
prepared statements and batch insert...

-Jay

On Fri, Apr 1, 2016 at 6:06 AM, Kavitha Veluri 
wrote:

> Thank you for the guidance . Does Kafka connect JDBC also supports writing
> data into database. I understand it efficiently reads data from any
> relational database, but I'm looking for writing the consumed topics into
> database .
>
> Thanks,
> Kavitha veluri
>
> On Mar 31, 2016, at 10:51 PM, Surendra, Manchikanti <
> surendra.manchika...@gmail.com>
> wrote:
>
> Please look into Confluent Kafka connectors.
>
> http://www.confluent.io/developers/connectors
>
> -- Surendra Manchikanti
>
> On Thu, Mar 31, 2016 at 6:43 PM, Kavitha Veluri <
> kavitha.vel...@outlook.com> wrote:
> Hi,
>
> I'm trying to use Kafka streams for my use case which consumes data from
> producer , processes it and pushes that data to database.
>
> I spent time trying to find any similar examples but couldn't find one.
>
> Can you please help me by sending a sample example code for my use case?
>
> Any help is greatly appreciated .
>
> Thanks ,
> Kavitha veluri
>
>


Re: Help understanding what happened

2016-03-19 Thread Jay Kreps
If you hard kill the broker when it restarts it doesn't know the
status of it's on-disk files, it will need to run though the last log
segment to validate the checksums of messages and rebuild the index
off this to ensure consistency. (Why does it need to do this
validation? Because in the event of a server crash filesystems have
very little in the way of general guarantees for unflushed data so
they can legally leave invalid bytes you never wrote in unflushed
portions of the log or index (shocking but true). In your case it
wasn't a server crash, just a process kill, but there is no way to
differentiate so we have to pessimistically do this check on any
restart after an unclean shutdown). This message doesn't indicate
anything bad other than an unclean shutdown occurred (which is why
it's a WARN). I do think maybe that error message is a bit too
alarming, though.

The actual reason for the sanity check failure is that we memory map
fixed size index file chunks which means that the first part of the
index contains real entries and the rest just unfilled zeros, the
unfilled zero at the end is the increment from the base offset of the
segment and the sanity check is validating that that is true. Had that
check not failed, the index still would be rebuilt, I think, since
we're going to run recovery on the segment regardless, but I guess
that that is the check hit first (haven't looked at the sequencing).

-Jay

On Wed, Mar 16, 2016 at 3:17 PM, Scott Reynolds  wrote:
> In a test in staging environment, we kill -9 the broker. It was started
> back up by runit and started recovering. We are seeing errors like this:
>
> WARN Found an corrupted index file,
> /mnt/services/kafka/data/TOPIC-17/16763460.index, deleting and
> rebuilding index... (kafka.log.Log)
>
> The file is a multiple of 8 (10485760) and has entries.
>
> So this leads me to believe that lastOffset <= baseOffset (
> https://code.hq.twilio.com/data/kafka/blob/35003fd51b80d605d40339dead623012442a92ab/core/src/main/scala/kafka/log/OffsetIndex.scala#L354
> )
>
> Was wondering how that could happen ? Isn't baseOffset taken from the
> filename and therefore is the FIRST entry in the log ? All other entries
> should be greater then that.


Re: New client commitAsync SendFailedException

2016-03-14 Thread Jay Kreps
This seems like a bug, no? It should just initiate the request not wait for
it to be written, there is no way for the user to reason about the state of
the send buffer.

-jay

On Monday, March 14, 2016, Jason Gustafson  wrote:

> Hey Alexey,
>
> Asynchronous commit handling could probably be improved quite a bit.
> Basically what's happening is that the client's send buffer is getting
> filled up, which causes the subsequent commits to fail with
> SendFailedException. We don't currently implement any retrying for
> asynchronous commits, so even transient failures get bubbled up to the
> user. This is.. unfortunate. We probably can implement some retry logic,
> but we need to be careful careful about preserving the commit order. Maybe
> this is as simple as adding sequence numbers for each partition. I haven't
> thought it all the way through, but you're welcome to open a JIRA if you
> think it would be helpful.
>
> For now, the easiest way to mitigate the problem is probably to batch the
> commits into a single call to commitAsync(). Would that work for you?
>
> -Jason
>
> On Mon, Mar 14, 2016 at 8:18 AM, Alexey Romanchuk <
> alexey.romanc...@gmail.com > wrote:
>
> > Hi all!
> >
> > I am using new client 0.9.0.1.
> >
> > I found that when I call commitAsync multiple times before calling poll
> > most of commits failed with SendFailedException.
> >
> > Here it is an example of code -
> > https://gist.github.com/13h3r/42633bcd64b80ddffe6b
> >
> > Could you please explain commitAsync in more details?
> >
> > What is wrong with calling commitAsync multiple times? Should I call poll
> > immediate after calling commitAsync? Should I precalculate offsets for
> > multiple commits for every topic/partition to minimize number of offsets?
> >
> > Thanks!
> >
>


Re: Kafka Streams

2016-03-11 Thread Jay Kreps
Hey David,

The commit always happens at a "safe point", when the local portion of the
processing topology has fully processed a set of inputs. The frequency is
controlled by the property commit.interval.ms.

-Jay

On Fri, Mar 11, 2016 at 9:28 AM, David Buschman <david.busch...@timeli.io>
wrote:

> @Jay, I currently use reactive-kaka for my Kafka sources and sinks in my
> stream processing apps. I was interested to see if this new stream API
> would make that setup easier/simpler/better in the future when it becomes
> available.
>
> How does the Streams API handle the commit offsets? Since you are
> processing "1-at-a-time”, is it auto magic on commit handling at the
> beginning/end of the processing or can we specify where in the processing
> an offset commit happens?
>
> Thanks,
> DaVe.
>
> David Buschman
> d...@timeli.io
>
>
>
> > On Mar 11, 2016, at 7:21 AM, Dick Davies <d...@hellooperator.net> wrote:
> >
> > Nice - I've read topics on the idea of a database as the 'now' view of a
> stream
> > of updates, it's a very powerful concept.
> >
> > Reminds me of Rich Hickeys talk on DAtomic, if anyone's seen that.
> >
> >
> >
> > On 10 March 2016 at 21:26, Jay Kreps <j...@confluent.io> wrote:
> >> Hey all,
> >>
> >> Lot's of people have probably seen the ongoing work on Kafka Streams
> >> happening. There is no real way to design a system like this in a
> vacuum,
> >> so we put up a blog, some snapshot docs, and something you can download
> and
> >> use easily to get feedback:
> >>
> >>
> http://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple
> >>
> >> We'd love comments or thoughts from anyone...
> >>
> >> -Jay
>
>


Re: Kafka Streams

2016-03-10 Thread Jay Kreps
Hey David,

Yeah I think the similarity to Spark (and Flink and RxJava) is the stream
api style in the DSL. That is totally the way to go for stream processing.
We tried really hard to make that work early on when we were doing Samza,
but we really didn't understand the whole iterator/observable distinction
and the experiment wasn't very successful. We ended up doing a process()
callback in Samza which I think is just much much less readable. One of the
nice things about Kafka Streams is I think we really got this right. The
API is split into two layers--a kind of infrastructure layer which is based
on modeling data flow DAGs, in some sense all stream processing boils down
to this, though it is not necessarily the most readable way to express it.
This layer is documented here (
http://docs.confluent.io/2.1.0-alpha1/streams/developer-guide.html#streams-developer-guide-processor-api).
Then on top of that you can layer any kind of DSL or language you like. The
KStreams layer is our take on a readable DSL.

As for RxJava, it is super cool. We looked at it a little bit as a
potential alternative language versus doing a custom DSL in KStreams. There
is enough that is unique to distributed stream processing, including the
whole table/stream distinction, the details of the partitioning model and
when data is committed, etc that we felt trying to glue something on top
would end up being a bit limiting. That said, I think there is
reactive-streams integration for Kafka, though I have no experience with it:
  https://github.com/akka/reactive-kafka

Cheers,

-Jay

On Thu, Mar 10, 2016 at 3:26 PM, David Buschman <david.busch...@timeli.io>
wrote:

> Very interesting, looks a lot like many operations from Spark were brought
> across.
>
> Any plans to integrate with the reactive-stream protocol for
> interoperability with libraries akka-stream and RxJava?
>
> Thanks,
> DaVe.
>
> David Buschman
> d...@timeli.io
>
>
>
> > On Mar 10, 2016, at 2:26 PM, Jay Kreps <j...@confluent.io> wrote:
> >
> > Hey all,
> >
> > Lot's of people have probably seen the ongoing work on Kafka Streams
> > happening. There is no real way to design a system like this in a vacuum,
> > so we put up a blog, some snapshot docs, and something you can download
> and
> > use easily to get feedback:
> >
> >
> http://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple
> >
> > We'd love comments or thoughts from anyone...
> >
> > -Jay
>
>


Kafka Streams

2016-03-10 Thread Jay Kreps
Hey all,

Lot's of people have probably seen the ongoing work on Kafka Streams
happening. There is no real way to design a system like this in a vacuum,
so we put up a blog, some snapshot docs, and something you can download and
use easily to get feedback:

http://www.confluent.io/blog/introducing-kafka-streams-stream-processing-made-simple

We'd love comments or thoughts from anyone...

-Jay


Re: session.timeout.ms limit

2016-03-02 Thread Jay Kreps
Hey Gligor,

Sorry for the rough edges. I think there are a variety of rough edges in
error messages here we can improve:

   1. "Error ILLEGAL_GENERATION occurred while committing offsets for group
   MetadataConsumerSpout" is obviously NOT the most intuitive error message,
   it doesn't really explain what happened or what to do to fix it. We should
   improve that.
   2. The error when you exceed your max session timeout doesn't tell you
   that there is a config on the broker that controls this. There isn't an
   easy way to figure this out on your own.
   3. The default max on the broker should probably be higher?
   4. The docs on group.max.session.timeout.ms should explain why this
   exists, why you'd want to change it, etc.
   5. We are working on a way to control the maximum number of messages per
   poll request which will help reduce the session timeout you need, but I
   think that is orthogonal to the confusion.

Thanks for walking through the steps you had to go through in figuring it
out, that will help us to round off some of the corners.

-Jay

On Wed, Mar 2, 2016 at 6:09 AM, Gligor Vanessa 
wrote:

> Hello,
>
> I am using Kafka higher consumer 0.9.0. I am not using the auto commit for
> the offsets, so after I consume the messaged (poll from Kafka) I will have
> to commit the offsets manually.
>
> The issue that I have is actually that the processing of the messages takes
> longer than 30s (and I cannot call poll again, before these messages are
> processed) and when I try to commit the offset a exception is thrown:
> ERROR o.a.k.c.c.i.ConsumerCoordinator - Error ILLEGAL_GENERATION occurred
> while committing offsets for group MetadataConsumerSpout.
> (I have found on stackoverflow this explanation: so if you wait for longer
> that the timeout request then the coordinator for the topic will kickout
> the consumer because it will think is dead and it will rebalance the group)
>
> In order to get rid of this I have thought about a couple of solutions:
>
> 1. The configuration session.timeout.ms has a maximum value, so if I try
> to
> set it to 60 seconds, also I get an exception, because this value is not in
> the valid interval.
>
> 2. I have tried to find a solution to get a paginated request when the
> polling method is called - no success.
>
> 3. I have tried to send a heart beat from the outside of the poll (because
> this method sends the heartbeats) - no success.
>
>
> Thank you.
>
> Vanessa.
>


Re: Exactly-once publication behaviour

2016-02-21 Thread Jay Kreps
Hey Andrew,

Yeah I think the current state is that we did several design and prototypes
(both the transaction work and the idempotence design and the conditional
write KIP), but none of these offshoots is really fully rationalized with
the other ones. Slow progress in this area has been mainly due to time
constraints--no one working on it full time. We're interested in picking up
the work at Confluent and hope to get some design ideas out on wiki for
discussion in the next month or so.

-Jay

On Sun, Feb 21, 2016 at 5:52 AM, Andrew Schofield <
andrew_schofield_j...@outlook.com> wrote:

> Hi,
> That's good, but somewhat mysterious :) I'd like to help accelerate
> exactly-one behaviour.  Perhaps it might be a candidate for the release
> after 0.10, if there's sufficient interest in the community.
>
> Could someone share the latest thinking on this subject? I'm not sure what
> the appropriate forum is. How have you previously been sharing high-level
> architectural plans for people to pick up and deliver? Since there's no KIP
> I guess it's a bit premature to discuss on the KIP call.
>
> Thanks
> Andrew
>
> > Subject: Re: Exactly-once publication behaviour
> > From: b...@confluent.io
> > Date: Fri, 19 Feb 2016 15:13:07 -0800
> > To: users@kafka.apache.org
> >
> > Hi Andrew
> >
> > There are plans to add exactly once behaviour. This will likely be a
> little more than Idempotent producers with the motivation being to provide
> better delivery guarantees for Connect, Streams and Mirror Maker.
> >
> > B
> >
> >
> >
> >> On 19 Feb 2016, at 13:54, Andrew Schofield  wrote:
> >>
> >> When publishing messages to Kafka, you make a choice between
> at-most-once and at-least-once delivery, depending on whether you wait for
> acknowledgments and whether you retry on failures. In most cases, those
> options are good enough. However, some systems offer exactly-once
> reliability too. Although my view is that the practical use of exactly-once
> is limited in the situations that Kafka is generally used for, when you're
> connecting other systems to Kafka or bridging between protocols, I think
> there is value in propagating the reliability level that the other system
> expects.
> >>
> >> As a consumer, you can manage your offset and get exactly-once
> delivery, or more likely exactly-once processing, of the messages.
> >>
> >> I've read about idempotent producers (
> https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer)
> and I know there's been some discussion about transactions too.
> >>
> >> Is there a plan to provide the tools to enable exactly-once publication
> behaviour? Is this a planned enhancement to Kafka Connect? Is there already
> some technique that people are using effectively to get exactly-once?
> >>
> >> Andrew Schofield
> >
>
>


Re: Kafka Streams: Possible to achieve at-least-once delivery with Streams?

2016-02-18 Thread Jay Kreps
Yeah I didn't mean to imply that we committed after each poll, but rather
that when it was time to commit, this would happen on the next poll call
and hence only commit processed messages.

-Jay

On Thu, Feb 18, 2016 at 2:21 PM, Avi Flax <a...@aviflax.com> wrote:

> On Thu, Feb 18, 2016 at 4:26 PM, Jay Kreps <j...@confluent.io> wrote:
> > The default semantics of the new consumer with auto commit are
> > at-least-once-delivery. Basically during the poll() call the commit will
> be
> > triggered and will commit the offset for the messages consumed during the
> > previous poll call. This is an advantage over the older scala consumer
> > where the consumer did not have this guarantee because the commit
> happened
> > asynchronously in a separate thread and hence could end up preceeding or
> > succeeding the actual processing of data.
>
> Sorry, I just realized I have one follow-up question on this… if this
> is the case, then why does the new consumer have the config property
> `auto.commit.interval.ms`? The approach of committing automatically
> after an interval seems to me to be at odds with the approach of
> committing after each call to poll(). What am I missing here?
>
> Thanks!
> Avi
>


Re: Kafka Streams: Possible to achieve at-least-once delivery with Streams?

2016-02-18 Thread Jay Kreps
The default semantics of the new consumer with auto commit are
at-least-once-delivery. Basically during the poll() call the commit will be
triggered and will commit the offset for the messages consumed during the
previous poll call. This is an advantage over the older scala consumer
where the consumer did not have this guarantee because the commit happened
asynchronously in a separate thread and hence could end up preceeding or
succeeding the actual processing of data.

For streams this is exactly the same, and the guarantee is also
at-least-once-delivery (we will work on strengthening this in the future).

-Jay

On Thu, Feb 18, 2016 at 1:00 PM, Avi Flax  wrote:

> Hello all, I have a question about Kafka Streams, which I’m evaluating
> for a new project. (I know it’s still a work in progress but it might
> work anyway for this project.)
>
> I’m new to Kafka in particular and distributed systems in general so
> please forgive me I’m confused about any of these concepts.
>
> From reading the docs on the new consumer API, I have the impression
> that letting the consumer auto-commit is roughly akin to at-most-once
> delivery, because a commit could occur past a record that wasn’t
> actually processed. So in order to achieve at-least-once delivery, one
> needs to employ “manual offset control” and explicitly commit after
> processing has succeeded.
>
> If I’ve got that right, then that leads me to my question about KStreams.
>
> From looking at the word count examples, it seems pretty clear that
> using the “lower level” approach demonstrated in WordCountProcessorJob
> — wherein a TopologyBuilder is created and supplied with a
> ProcessorSupplier that supplies a Processor that receives a
> ProcessorContext and calls commit on that ProcessorContext once
> processing succeeds — enables the at-least-once delivery model. OK,
> cool.
>
> Looking at WordCountJob, however, which uses KStreams, I don’t see any
> committing happening there explicitly, and in fact I searched the
> entire kstream source tree (for the kstream package and its internals
> sub-package) and I don’t see any calls to commit there. So my
> _impression_ is that maybe any KStream topology can use only
> auto-commit, and therefore only at-most-once processing.
>
> Basically I’m wondering if my impression is correct, or not, or if I’m
> just totally misunderstanding the code in its current state.
>
> Thanks!
> Avi
>


Re: What is the best way to write Kafka data into HDFS?

2016-02-11 Thread Jay Kreps
Check out Kafka Connect:

http://www.confluent.io/blog/how-to-build-a-scalable-etl-pipeline-with-kafka-connect

-Jay


On Wed, Feb 10, 2016 at 5:09 PM, R P  wrote:

> Hello All,
>   New Kafka user here. What is the best way to write Kafka data into HDFS?
> I have looked into following options and found that Flume is quickest and
> easiest to setup.
>
> 1. Flume
> 2. KaBoom
> 3. Kafka Hadoop Loader
> 4. Camus -> Gobblin
>
> Although Flume can result into small file problems when your data is
> partitioned and some partitions generate sporadic data.
>
> What are some best practices and options to write data from Kafka to HDFS?
>
> Thanks,
> R P
>
>
>
>


Re: MongoDB Kafka Connect driver

2016-01-29 Thread Jay Kreps
Ah, agreed. This approach is actually quite common in change capture,
though. For many use cases getting the final value is actually preferable
to getting intermediates. The exception is usually if you want to do
analytics on something like number of changes.

On Fri, Jan 29, 2016 at 9:35 AM, Ewen Cheslack-Postava <e...@confluent.io>
wrote:

> Jay,
>
> You can query after the fact, but you're not necessarily going to get the
> same value back. There could easily be dozens of changes to the document in
> the oplog so the delta you see may not even make sense given the current
> state of the document. Even if you can apply it the delta, you'd still be
> seeing data that is newer than the update. You can of course take this
> shortcut, but it won't give correct results. And if the data has been
> deleted since then, you won't even be able to write the full record... As
> far as I know, the way the op log is exposed won't let you do something
> like pin a query to the state of the db at a specific point in the op log
> and you may be reading from the beginning of the op log, so I don't think
> there's a way to get correct results by just querying the DB for the full
> documents.
>
> Strictly speaking you don't need to get all the data in memory, you just
> need a record of the current set of values somewhere. This is what I was
> describing following those two options -- if you do an initial dump to
> Kafka, you could track only offsets in memory and read back full values as
> needed to apply deltas, but this of course requires random reads into your
> Kafka topic (but may perform fine in practice depending on the workload).
>
> -Ewen
>
> On Fri, Jan 29, 2016 at 9:12 AM, Jay Kreps <j...@confluent.io> wrote:
>
> > Hey Ewen, how come you need to get it all in memory for approach (1)? I
> > guess the obvious thing to do would just be to query for the record
> > after-image when you get the diff--e.g. just read a batch of changes and
> > multi-get the final values. I don't know how bad the overhead of this
> would
> > be...batching might reduce it a fair amount. The guarantees for this are
> > slightly different than the pure oplog too (you get the current value not
> > every necessarily every intermediate) but that should be okay for most
> > uses.
> >
> > -Jay
> >
> > On Fri, Jan 29, 2016 at 8:54 AM, Ewen Cheslack-Postava <
> e...@confluent.io>
> > wrote:
> >
> > > Sunny,
> > >
> > > As I said on Twitter, I'm stoked to hear you're working on a Mongo
> > > connector! It struck me as a pretty natural source to tackle since it
> > does
> > > such a nice job of cleanly exposing the op log.
> > >
> > > Regarding the problem of only getting deltas, unfortunately there is
> not
> > a
> > > trivial solution here -- if you want to generate the full updated
> record,
> > > you're going to have to have a way to recover the original document.
> > >
> > > In fact, I'm curious how you were thinking of even bootstrapping. Are
> you
> > > going to do a full dump and then start reading the op log? Is there a
> > good
> > > way to do the dump and figure out the exact location in the op log that
> > the
> > > query generating the dump was initially performed? I know that
> internally
> > > mongo effectively does these two steps, but I'm not sure if the
> necessary
> > > info is exposed via normal queries.
> > >
> > > If you want to reconstitute the data, I can think of a couple of
> options:
> > >
> > > 1. Try to reconstitute inline in the connector. This seems difficult to
> > > make work in practice. At some point you basically have to query for
> the
> > > entire data set to bring it into memory and then the connector is
> > > effectively just applying the deltas to its in memory copy and then
> just
> > > generating one output record containing the full document each time it
> > > applies an update.
> > > 2. Make the connector send just the updates and have a separate stream
> > > processing job perform the reconstitution and send to another topic. In
> > > this case, the first topic should not be compacted, but the second one
> > > could be.
> > >
> > > Unfortunately, without additional hooks into the database, there's not
> > much
> > > you can do besides this pretty heavyweight process. There may be some
> > > tricks you can use to reduce the amount of memory used during the
> process
> > > (e.g. keep a small cache of actual records and for the rest only store
> > >

Re: MongoDB Kafka Connect driver

2016-01-29 Thread Jay Kreps
Also, most database provide a "full logging" option that let's you capture
the whole row in the log (I know Oracle and MySQL have this) but it sounds
like Mongo doesn't yet. That would be the ideal solution.

-Jay

On Fri, Jan 29, 2016 at 9:38 AM, Jay Kreps <j...@confluent.io> wrote:

> Ah, agreed. This approach is actually quite common in change capture,
> though. For many use cases getting the final value is actually preferable
> to getting intermediates. The exception is usually if you want to do
> analytics on something like number of changes.
>
> On Fri, Jan 29, 2016 at 9:35 AM, Ewen Cheslack-Postava <e...@confluent.io>
> wrote:
>
>> Jay,
>>
>> You can query after the fact, but you're not necessarily going to get the
>> same value back. There could easily be dozens of changes to the document
>> in
>> the oplog so the delta you see may not even make sense given the current
>> state of the document. Even if you can apply it the delta, you'd still be
>> seeing data that is newer than the update. You can of course take this
>> shortcut, but it won't give correct results. And if the data has been
>> deleted since then, you won't even be able to write the full record... As
>> far as I know, the way the op log is exposed won't let you do something
>> like pin a query to the state of the db at a specific point in the op log
>> and you may be reading from the beginning of the op log, so I don't think
>> there's a way to get correct results by just querying the DB for the full
>> documents.
>>
>> Strictly speaking you don't need to get all the data in memory, you just
>> need a record of the current set of values somewhere. This is what I was
>> describing following those two options -- if you do an initial dump to
>> Kafka, you could track only offsets in memory and read back full values as
>> needed to apply deltas, but this of course requires random reads into your
>> Kafka topic (but may perform fine in practice depending on the workload).
>>
>> -Ewen
>>
>> On Fri, Jan 29, 2016 at 9:12 AM, Jay Kreps <j...@confluent.io> wrote:
>>
>> > Hey Ewen, how come you need to get it all in memory for approach (1)? I
>> > guess the obvious thing to do would just be to query for the record
>> > after-image when you get the diff--e.g. just read a batch of changes and
>> > multi-get the final values. I don't know how bad the overhead of this
>> would
>> > be...batching might reduce it a fair amount. The guarantees for this are
>> > slightly different than the pure oplog too (you get the current value
>> not
>> > every necessarily every intermediate) but that should be okay for most
>> > uses.
>> >
>> > -Jay
>> >
>> > On Fri, Jan 29, 2016 at 8:54 AM, Ewen Cheslack-Postava <
>> e...@confluent.io>
>> > wrote:
>> >
>> > > Sunny,
>> > >
>> > > As I said on Twitter, I'm stoked to hear you're working on a Mongo
>> > > connector! It struck me as a pretty natural source to tackle since it
>> > does
>> > > such a nice job of cleanly exposing the op log.
>> > >
>> > > Regarding the problem of only getting deltas, unfortunately there is
>> not
>> > a
>> > > trivial solution here -- if you want to generate the full updated
>> record,
>> > > you're going to have to have a way to recover the original document.
>> > >
>> > > In fact, I'm curious how you were thinking of even bootstrapping. Are
>> you
>> > > going to do a full dump and then start reading the op log? Is there a
>> > good
>> > > way to do the dump and figure out the exact location in the op log
>> that
>> > the
>> > > query generating the dump was initially performed? I know that
>> internally
>> > > mongo effectively does these two steps, but I'm not sure if the
>> necessary
>> > > info is exposed via normal queries.
>> > >
>> > > If you want to reconstitute the data, I can think of a couple of
>> options:
>> > >
>> > > 1. Try to reconstitute inline in the connector. This seems difficult
>> to
>> > > make work in practice. At some point you basically have to query for
>> the
>> > > entire data set to bring it into memory and then the connector is
>> > > effectively just applying the deltas to its in memory copy and then
>> just
>> > > generating one output record containing the full document each time it
>> > > applies an upd

Re: MongoDB Kafka Connect driver

2016-01-29 Thread Jay Kreps
Hey Ewen, how come you need to get it all in memory for approach (1)? I
guess the obvious thing to do would just be to query for the record
after-image when you get the diff--e.g. just read a batch of changes and
multi-get the final values. I don't know how bad the overhead of this would
be...batching might reduce it a fair amount. The guarantees for this are
slightly different than the pure oplog too (you get the current value not
every necessarily every intermediate) but that should be okay for most uses.

-Jay

On Fri, Jan 29, 2016 at 8:54 AM, Ewen Cheslack-Postava 
wrote:

> Sunny,
>
> As I said on Twitter, I'm stoked to hear you're working on a Mongo
> connector! It struck me as a pretty natural source to tackle since it does
> such a nice job of cleanly exposing the op log.
>
> Regarding the problem of only getting deltas, unfortunately there is not a
> trivial solution here -- if you want to generate the full updated record,
> you're going to have to have a way to recover the original document.
>
> In fact, I'm curious how you were thinking of even bootstrapping. Are you
> going to do a full dump and then start reading the op log? Is there a good
> way to do the dump and figure out the exact location in the op log that the
> query generating the dump was initially performed? I know that internally
> mongo effectively does these two steps, but I'm not sure if the necessary
> info is exposed via normal queries.
>
> If you want to reconstitute the data, I can think of a couple of options:
>
> 1. Try to reconstitute inline in the connector. This seems difficult to
> make work in practice. At some point you basically have to query for the
> entire data set to bring it into memory and then the connector is
> effectively just applying the deltas to its in memory copy and then just
> generating one output record containing the full document each time it
> applies an update.
> 2. Make the connector send just the updates and have a separate stream
> processing job perform the reconstitution and send to another topic. In
> this case, the first topic should not be compacted, but the second one
> could be.
>
> Unfortunately, without additional hooks into the database, there's not much
> you can do besides this pretty heavyweight process. There may be some
> tricks you can use to reduce the amount of memory used during the process
> (e.g. keep a small cache of actual records and for the rest only store
> Kafka offsets for the last full value, performing a (possibly expensive)
> random read as necessary to get the full document value back), but to get
> full correctness you will need to perform this process.
>
> In terms of Kafka Connect supporting something like this, I'm not sure how
> general it could be made, or that you even want to perform the process
> inline with the Kafka Connect job. If it's an issue that repeatedly arises
> across a variety of systems, then we should consider how to address it more
> generally.
>
> -Ewen
>
> On Tue, Jan 26, 2016 at 8:43 PM, Sunny Shah  wrote:
>
> >
> > Hi ,
> >
> > We are trying to write a Kafka-connect connector for Mongodb. The issue
> > is, MongoDB does not provide an entire changed document for update
> > operations, It just provides the modified fields.
> >
> > if Kafka allows custom log compaction then It is possible to eventually
> > merge an entire document and subsequent update to to create an entire
> > record again.
> >
> > As Ewen pointed out to me on twitter, this is not possible, then What is
> > the Kafka-connect way of solving this issue?
> >
> > @Ewen, Thanks a lot for a really quick answer on twitter.
> >
> > --
> > Thanks and Regards,
> >  Sunny
> >
> > The contents of this e-mail and any attachment(s) are confidential and
> > intended for the named recipient(s) only. It shall not attach any
> liability
> > on the originator or TinyOwl Technology Pvt. Ltd. or its affiliates. Any
> > form of reproduction, dissemination, copying, disclosure, modification,
> > distribution and / or publication of this message without the prior
> written
> > consent of the author of this e-mail is strictly prohibited. If you have
> > received this email in error please delete it and notify the sender
> > immediately. You are liable to the company (TinyOwl Technology Pvt.
> Ltd.) in
> > case of any breach in ​
> > ​confidentialy (through any form of communication) wherein the company
> has
> > the right to injunct legal action and an equitable relief for damages.
> >
>
>
>
> --
> Thanks,
> Ewen
>


Re: kafka-producer-perf-test.sh - 0.8.2.1

2016-01-11 Thread Jay Kreps
If you use the perf test without any bound on throughput it will
always try to send data faster than it can go out and build up a queue
of unsent data. So e.g. if your buffer is 1MB each send will be
blocked on waiting for the full 1MB of queued data to clear out and
get sent. This makes sense if you think about it.

If you want to test latency under load you need to throttle the
maximum throughput to something like what you think you would see in
your application (there is an option for that in the command line
options).

-Jay

On Mon, Jan 11, 2016 at 11:02 AM, Andrej Vladimirovich
 wrote:
> Ewen,
>
> One more question. I mentioned that *kafka-run-class.sh
> org.apache.kafka.clients.tools.ProducerPerformance* latency is a lot higher
> than *kafka-run-class.sh kafka.tools.TestEndToEndLatency.*
>
> Example:
>
> *ProducerPerformance:*
>
> 5000 records sent, 337463.891364 records/sec (32.18 MB/sec), *1548.51
> ms avg latency*, 3186.00 ms max latency, 2478 ms 50th, 3071 ms 95th, 3118
> ms 99th, 3179 ms 99.9th.
>
> *TestEndToEndLatency:*
>
> Percentiles: 50th = 8, 99th = 9, 99.9th = 20
>
> So 1548.51 ms vs 9 ms.Huge difference.
>
> I am using the same cluster, same server and same topic to run both tests.
> It does not make any sense to me why would End to End be so low and
> Producer to Kafka is so large?
>
> I did some research online and found other people having the same question
> without any responses.
>
> Thanks a lot for your help!
>
> Andrew
>
> On Fri, Jan 8, 2016 at 5:44 PM, Ewen Cheslack-Postava 
> wrote:
>
>> It is single threaded in the sense that you can not request that multiple
>> threads be used to call producer.send(). However, the producer has its own
>> internal thread for doing network IO. When you have such a simple producer,
>> depending on the size of messages you can saturate a 1Gbps link with a
>> single thread, so usually using more threads isn't much help. If you still
>> need more throughput, you can just use more processes.
>>
>> -Ewen
>>
>> On Fri, Jan 8, 2016 at 1:24 PM, Andrej Vladimirovich <
>> udodizdu...@gmail.com>
>> wrote:
>>
>> > Thanks Ewen. Do you know if kafka-run-class.sh
>> > org.apache.kafka.clients.tools.ProducerPerformance
>> > is single threaded? Or is there any way to specify number of threads?
>> >
>> > On Fri, Jan 8, 2016 at 1:24 PM, Ewen Cheslack-Postava > >
>> > wrote:
>> >
>> > > Ah, sorry, I missed the version number in your title. I think this tool
>> > saw
>> > > some rearrangement in 0.9.0 and I was looking at the latest version.
>> > > Unfortunately it doesn't look like the old
>> > kafka.tools.ProducerPerformance
>> > > that is used in kafka-producer-perf-test.sh in 0.8.2.1 supports passing
>> > in
>> > > additional properties.
>> > >
>> > > -Ewen
>> > >
>> > > On Fri, Jan 8, 2016 at 9:10 AM, Andrej Vladimirovich <
>> > > udodizdu...@gmail.com>
>> > > wrote:
>> > >
>> > > > Ewen,
>> > > >
>> > > > I tried that before like this:
>> > > >
>> > > > ./kafka-producer-perf-test.sh --broker-list test:9092 --topics
>> test8-3
>> > > > --messages 200 --new-producer --message-size 200
>> > > > --show-detailed-stats max.request.size=1000
>> > > >
>> > > > and it does not work. It comletly ignore this option.
>> > > >
>> > > > And --producer-props is not a valid option for
>> > > kafka-producer-perf-test.sh.
>> > > > Maybe it is not the right syntax? But I tried a lot of different ways
>> > and
>> > > > have yet to find the right one.
>> > > >
>> > > > Thanks!
>> > > >
>> > > > Andrew
>> > > >
>> > > > On Fri, Jan 8, 2016 at 10:54 AM, Ewen Cheslack-Postava <
>> > > e...@confluent.io>
>> > > > wrote:
>> > > >
>> > > > > Andrew,
>> > > > >
>> > > > > kafka-producer-perf-test.sh is just a wrapper around
>> > > > > orga.apache.kafka.clients.tools.ProducerPerformance and all command
>> > > line
>> > > > > options should be forwarded. Can you just pass a --producer-props
>> to
>> > > set
>> > > > > max.request.size to a larger value?
>> > > > >
>> > > > > -Ewen
>> > > > >
>> > > > > On Fri, Jan 8, 2016 at 7:51 AM, Andrej Vladimirovich <
>> > > > > udodizdu...@gmail.com>
>> > > > > wrote:
>> > > > >
>> > > > > > Hi!
>> > > > > >
>> > > > > > I am testing Kafka's performance with large messages and would
>> like
>> > > to
>> > > > > > specify maximum request size when I run
>> > kafka-producer-perf-test.sh:
>> > > > > >
>> > > > > > ./kafka-producer-perf-test.sh --broker-list "test1:9092" --topics
>> > > > test8-3
>> > > > > > --messages 100 --new-producer --msage-size 110
>> > > > > > --show-detailed-stats
>> > > > > >
>> > > > > > I always get this message if I specify somethine larger than 1MB:
>> > > > > >
>> > > > > > ERROR Error when sending message to topic test8-3 with key: 1
>> > bytes,
>> > > > > value:
>> > > > > > 110 bytes with error: The message is 1100027 bytes when
>> > > serialized
>> > > > > > which is larger than the maximum request size you have configured
>> > 

Re: Good java consumer example with new 0.9 consumer api

2015-12-07 Thread Jay Kreps
Hey guys,

Those docs were for a draft version of the API and I think they may be
a bit out of date. I'd stick with the javadocs linked here:
http://kafka.apache.org/documentation.html#newconsumerapi

-Jay

On Sun, Dec 6, 2015 at 7:40 AM, Dhyan Muralidharan
 wrote:
> Thank you .These docs are good  .Thanks for your help .
>
> —Dhyan
>> On Dec 6, 2015, at 6:54 AM, jinxing  wrote:
>>
>> You can refer:
>> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/
>>
>>
>>
>>
>>
>>
>>
>> At 2015-12-06 08:41:24, "Dhyan Muralidharan"  
>> wrote:
>>> Hi All,
>>>
>>> Can some one point me good java Consumer that uses  0.9 new consumer API . 
>>> I searched in github and couldn’t find one .
>>>
>>> —Dhyan
>


Re: how robust are file accesses?

2015-12-07 Thread Jay Kreps
Kafka should be robust against abrupt termination.

-Jay

On Sat, Dec 5, 2015 at 11:05 AM, Per Jahn  wrote:
> Hi
>
> Are files that kafka writes to the file system robust against abrupt 
> termination?
> Including fs meta data.
> And assuming it's used with a journaling fs.
> Or do I need ups and always shut down kafka gracefully to prevent data 
> corruption?
> I'm not afraid of losing data (have redundant servers), but I'm afraid of 
> file corruption that prevents kafka from starting without problems.
>
> I'm asking for worst case of course, not was usually happens.
>
> Thanks.
>
> / Per
>


Re: New Consumer API + Reactive Kafka

2015-12-02 Thread Jay Kreps
It's worth noting that both the old and new consumer are identical in the
number of records fetched at once and this is bounded by the fetch size and
the number of partitions you subscribe to. The old consumer held these in
memory internally and waited for you to ask for them, the new consumer
immediately gives you what it has. Overall, though, the new consumer gives
much better control over what is being fetched since it only uses memory
when you call poll(); the old consumer had a background thread doing this
which would only stop when it filled up a queue of unprocessed
chunks...this is a lot harder to predict.

-Jay

On Wed, Dec 2, 2015 at 7:13 AM, Gwen Shapira  wrote:

> On Wed, Dec 2, 2015 at 10:44 PM, Krzysztof Ciesielski <
> krzysztof.ciesiel...@softwaremill.pl> wrote:
>
> > Hello,
> >
> > I’m the main maintainer of Reactive Kafka - a wrapper library that
> > provides Kafka API as Reactive Streams (
> > https://github.com/softwaremill/reactive-kafka).
> > I’m a bit concerned about switching to Kafka 0.9 because of the new
> > Consumer API which doesn’t seem to fit well into this paradigm, comparing
> > to the old one. My main concerns are:
> >
> > 1. Our current code uses the KafkaIterator and reads messages
> > sequentially, then sends them further upstream. In the new API, you
> cannot
> > control how many messages are returned with poll(), so we would need to
> > introduce some kind of in-memory buffering.
> > 2. You cannot specify which offsets to commit. Our current native
> > committer (
> >
> https://github.com/softwaremill/reactive-kafka/blob/4055e88c09b8e08aefe8dbbd4748605df5779b07/core/src/main/scala/com/softwaremill/react/kafka/commit/native/NativeCommitter.scala
> )
> > uses the OffsetCommitRequest/Response API and
> > kafka.api.ConsumerMetadataRequest/Response for resolving brokers.
> Switching
> > to Kafka 0.9 brings some compilation errors that raise questions.
> >
> > My questions are:
> >
> > 1. Do I understand the capabilities and limitations of new API correctly?
> > :)
> >
>
> The first limitation is correct - poll() may return any number of records
> and you need to handle this.
> The second is not correct - commitSync() can take a map of TopicPartition
> and Offsets, so you would only commit specific offsets of specific
> partitions.
>
>
>
> > 2. Can we stay with the old iterator-based client, or is it going to get
> > abandoned in future Kafka versions, or discouraged for some reasons?
> >
>
> It is already a bit behind - only the new client includes support for
> secured clusters (authentication and encryption). It will get deprecated in
> the future.
>
>
> > 3. Can we still use the OffsetCommitRequest/Response API to commit
> > messages manually? If yes, could someone update this example:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
> or
> > give me a few hints on how to do this with 0.9?
> >
>
> AFAIK, the wire protocol and the API is not going anywhere. Hopefully you
> can use the new objects we provide in the clients jar
> (org.apache.kafka.common.requests).
>
>
> >
> > By the way, we’d like our library to appear on the Ecosystem Wiki, I’m
> not
> > sure how to request that officially :)
> >
>
> Let us know what to write there and where to link :)
>
>
> >
> > —
> > Bests,
> > Chris
> > SoftwareMill
>


Kafka Summit Registration and CFP

2015-12-02 Thread Jay Kreps
Hey Everyone,

As you may have heard, Confluent is hosting the first ever Kafka
Summit. It'll be in San Francisco on Tuesday, April 26, 2016.

We'll be announcing open registration tomorrow, but I wanted to let
everyone here know first, and also let you know there is a $50
community discount. To get the discount enter this promotional code:
COMMUNITY-KS2016-50D

Don't put the discount code on twitter, it's just for the people on
the mailing list :-)

Also, the call for proposals is open--we'd love to have you give a
talk on how you're using Kafka, your experience with stream processing
frameworks and applications, or anything else in the streaming data
space. Submission deadline is January 11.

Hope to see you all there!

http://www.kafka-summit.org

Cheers,

-Jay


Re: Any gotchas upgrading to 0.9?

2015-12-01 Thread Jay Kreps
I think the point is that we should ideally try to cover all these in the
"upgrade" notes.

-Jay

On Tue, Dec 1, 2015 at 10:37 AM, Aditya Auradkar 
wrote:

> Rajiv,
>
> By default, the quota is unlimited until you decide to configure them
> explicitly.
> And yes, we did get rid of "replica.lag.max.messages", so that
> configuration will no longer apply.
>
> Aditya
>
> On Tue, Dec 1, 2015 at 10:24 AM, Todd Snyder 
> wrote:
>
> > The quota page is here:
> > http://kafka.apache.org/documentation.html#design_quotas
> >
> > "By default, each unique client-id receives a fixed quota in bytes/sec as
> > configured by the cluster (quota.producer.default,
> quota.consumer.default)"
> >
> >
> > I also noticed there's been a change in the replication configuration
> > while reading:
> >
> http://www.confluent.io/blog/hands-free-kafka-replication-a-lesson-in-operational-simplicity/
> >
> > It may not break anything, but it may impact how you decide to configure
> > and monitor replication
> >
> > "Now there is only one value you need to configure on the server which is
> > replica.lag.time.max.ms. The interpretation of this has changed to be
> the
> > time for which a replica has been out-of-sync with the leader. Stuck or
> > failed replicas are detected the same way as before - if a replica fails
> to
> > send a fetch request for longer than replica.lag.time.max.ms, it is
> > considered dead and is removed from the ISR. The mechanism of detecting
> > slow replicas has changed - if a replica starts lagging behind the leader
> > for longer than replica.lag.time.max.ms, then it is considered too slow
> > and is removed from the ISR. So even if there is a spike in traffic and
> > large batches of messages are written on the leader, unless the replica
> > consistently remains behind the leader for replica.lag.time.max.ms, it
> > will not shuffle in and out of the ISR."
> >
> >
> >
> > -Original Message-
> > From: Rajiv Kurian [mailto:ra...@signalfx.com]
> > Sent: Tuesday, December 01, 2015 11:57
> > To: users@kafka.apache.org
> > Subject: Re: Any gotchas upgrading to 0.9?
> >
> > Also I remember reading (can't find now) something about default traffic
> > quotas. I'd hope the default quotas are very large (infinite?) and not
> > small so that compatibility is maintained. It would be very unfortunate
> if
> > some of our traffic was throttled because of the upgrade because of magic
> > defaults. For example we have a certain cluster dedicated to serving a
> > single important topic and we'd hate for it to be throttled because of
> > incorrect defaults.
> >
> > Thanks,
> > Rajiv
> >
> > On Tue, Dec 1, 2015 at 8:54 AM, Rajiv Kurian  wrote:
> >
> > > I saw the upgrade path documentation at
> > > http://kafka.apache.org/documentation.html and that kind of answers
> (1).
> > > Not sure if there is anything about client compatibility though.
> > >
> > >
> > > On Tue, Dec 1, 2015 at 8:51 AM, Rajiv Kurian 
> wrote:
> > >
> > >> I plan to upgrade both the server and clients to 0.9. Had a few
> > questions
> > >> before I went ahead with the upgrade:
> > >>
> > >> 1. Do all brokers need to be on 0.9? Currently we are running 0.8.2.
> > We'd
> > >> ideally like to convert only a few brokers to 0.9 and only if we don't
> > see
> > >> problems convert the rest.
> > >>
> > >> 2. Is it possible to run Kafka 0.9 clients (specifically the consumer)
> > >> with Kafka 0.8.2 brokers?
> > >>
> > >> Any link to the upgrade path would be really useful.
> > >>
> > >> Thanks,
> > >> Rajiv
> > >>
> > >
> > >
> >
>


Re: 0.9.0 Client - Streaming vs Polling

2015-11-26 Thread Jay Kreps
Actually this may be something we can improve in the documentation. Calling
poll(1000) doesn't mean "check for new messages every second" but rather
"return the instant there are new messages, but if no messages come return
after a one second timeout passes".

So in that respect both the old and new consumer are neither polling nor
streaming but rather a kind of "long poll" if you're familiar with that
term.

-Jay

On Thursday, November 26, 2015, Erik Pragt  wrote:

> Hi all,
>
> We're using Apache Kafka 0.9, and in our 0.8.2 Kafka, we had a consumer
> which used a ConsumerConnector to stream all the messages to be processed.
>
> The new KafkaConsumer seems to use a polling mechanism instead of
> streaming, and I was wondering: what is the 'right' way to go forward?
> Should I poll with 0 millis? Should I poll every 100ms and accept the
> polling delay? Should I stick to the previous way and keep using the
> ConsumerConnector to stream the messages per topic?
>
> I'm just wondering what the best way forward is, and I hope somebody can
> give me a bit of advice in this.
>
> Thanks,
>
> Erik
>


Re: Kafka Connect and Spark/Storm Comparisons

2015-11-25 Thread Jay Kreps
Hey Dave,

We're separating the problem of getting data in and out of Kafka from the
problem of transforming it. If you think about ETL (Extract, Transform,
Load), what Kafka Connect does is E and L really really well and not T at
all; the focus in stream processing systems is T with E and L being a bit
of a necessary evil. If you are trying to get a single stream of data for
one application, directly using Storm or Spark with appropriate plugins is
totally reasonable. If you are trying to capture a bunch of different data
sources for multiple uses these systems get really awkward really fast.

Imagine a world in which you wanted to capture a significant portion of
what happened in your company as real-time streams and where there are many
things that use data. You could theoretically set up a Storm or Spark job
for each database table purely for the purpose of loading data but managing
this would be a bit of a nightmare. I think this is where Kafka Connect
really shines.

The other advantage of this is that transformation of data is inherently a
deep problem that is close to the programmer. There is lots of room here
for query languages, frameworks in different languages, etc. On the other
hand ingress and egress is much more well defined problem.

So the approach we're building towards is one where data is captured more
or less as it is, at large scale, and then is available for further
transformation or loading into many other systems. The transformation would
be the role of the stream processing systems and the loading and unloading
the role of Kafka Connect.

The advantage Kafka Connect has is the following:
- No additional cluster is needed, it directly co-ordinates with the Kafka
cluster
- It does a good job of capturing schema information from sources if it is
present
- It does a good job of handling scalable data capture--if you want to add
a new table to the set of things you're pulling data from that is just a
simple REST call not another job to manually configure and manage.

Hope that sheds some light on things.

-Jay

On Wed, Nov 25, 2015 at 7:50 AM, Dave Ariens  wrote:

> I just finished reading up on Kafka Connect<
> http://kafka.apache.org/documentation.html#connect> and am trying to wrap
> my head around where it fits within the big data ecosystem.
>
> Other than the high level overview provided in the docs I haven't heard
> much about this feature. My limited understanding of it so far is that it
> includes semantics similar to Storm (sources/spouts, sinks/bolts) and
> allows for distributed processing of streams using tasks that handle data
> defined in records conforming to a schema.  Assuming that's mostly
> accurate, is anyone able to speak to why a developer would want to use
> Kafka Connect over Spark (or maybe even Storm but to a lesser degree)?  Is
> Kafka Connect trying to address any short comings?  I understand it greatly
> simplifies offset persistence but that's not terribly difficult to
> implement on top of Spark (see my offset persistence hack<
> https://gist.github.com/ariens/e6a39bc3dbeb11467e53>).  Where is Kafka
> Connect being targeted to within the  vast ecosystem that is big data?
>
> Does Kafka Connect offer efficiencies 'under the hood' taking advantage of
> data locality and the fact that it distributes workload on the actual Kafka
> cluster itself?
>
> I can see basic ETL and data warehouse bulk operations simplified where
> one just wants an easy way to get all data in/out of Kafka and reduce the
> network IO of having multiple compute clusters but for any data science
> type operations (machine learning, etc) I would expect working with Spark's
> RDDs to be more efficient.
>
>
>
>
>
>
>
>
>
>
>


Re: Performance Test Inquiries

2015-09-01 Thread Jay Kreps
Yeah okay never mind I misunderstood. I'm not sure of the cause either--I
didn't see that in my testing.

The spindle thing tends to get overestimated since our writes are async.
There is some perf hit from having multiple partitions (maybe 10-20%) but
mostly the OS does a good job of scheduling writes in order.

There is a strong effect from batching--in general fewer partitions will
lead to more efficient batching and hence more efficient network and
filesystem usage. My expectation, though, would be that you would get
plenty of batching in either test because the throughput is so high. You
could investigate this by setting linger.ms=1 and see if that changes
anything.

-Jay

On Mon, Aug 31, 2015 at 8:14 PM, explorer <jind...@gmail.com> wrote:

> Hi Jay,
>
> For issue #1, I will file a JIRA so the community or dev team can take
> a look at it.
>
> For my 2nd questions, the scenario goes like this:
>
> #Test_1
>
> $ bin/kafka-topics.sh --zookeeper 192.168.1.1:2181 --create --topic
> test1 --partitions 3 --replication-factor 1
> Created topic "test1".
>
> $ time bin/kafka-run-class.sh
> org.apache.kafka.clients.tools.ProducerPerformance test1 10485 10
> -1 acks=1 bootstrap.servers=192.168.1.1:9092 buffer.memory=67108864
> batch.size=8192
> 10485 records sent, 348.535718 records/sec (33.24 MB/sec)
>
> real0m30.623s
> user0m6.704s
> sys 0m1.023s
>
> #Test_2
>
> $bin/kafka-topics.sh --zookeeper 192.168.1.1:2181 --create --topic
> test2 --partitions 1 --replication-factor 1
> Created topic "test2".
>
> $ time bin/kafka-run-class.sh
> org.apache.kafka.clients.tools.ProducerPerformance test1 10485 10
> -1 acks=1 bootstrap.servers=192.168.1.1:9092 buffer.memory=67108864
> batch.size=8192
> 10485 records sent, 1086.303357 records/sec (103.60 MB/sec)
>
> real0m10.185s
> user0m9.138s
> sys 0m1.879s
>
> Literally, #Test_2 ran faster (10 s) and gave better throughput
> (103MB/sec).  The numbers from #Test_1 are just one-thirds that of
> #Test_2.  The topic test1 and test2 differ in such a way that test1
> was created with 3 partitions (each partition sits on different
> physical server) while test2 only has one partition.
>
> My understanding of partition is that more partitions (each on
> different spindle) would lead to better read/write performance because
> of the aggregate bandwidth derived from parallel operations.  But what
> I am seeing here defies my understanding.  I just wonder if I did the
> benchmark tests wrong or I had the concept wrong.
>
> Kind Regards,
>
> Paul
>
> On Tue, Sep 1, 2015 at 12:57 AM, Jay Kreps <j...@confluent.io> wrote:
> > In general increasing message size should increase bytes/sec throughput
> > since much of the work is on a per-message basis. I think the question
> > remains why raising the buffer size with fixed message size would drop
> the
> > throughput. Sounds like a bug if you can reproduce it consistently. Want
> to
> > file a JIRA and see if others can reproduce the same thing?
> >
> > For the multi-server test I may have misread your email. When you say you
> > see 33MB/sec across 3 servers does that mean an aggregate of ~100MB/sec?
> I
> > was assuming yes and what you were seeing was that you were maxing out
> the
> > client's bandwidth so as you added servers each server got a smaller
> chunk
> > of the ~100MB/sec client bandwidth. Maybe that's not what you're saying,
> > though.
> >
> > -Jay
> >
> > On Mon, Aug 31, 2015 at 9:49 AM, explorer <jind...@gmail.com> wrote:
> >
> >> H Jay,
> >>
> >> Thanks for the response.
> >>
> >> The second command was indeed a typo.  It should have been
> >>
> >> bin/kafka-run-class.sh
> >> org.apache.kafka.clients.tools.ProducerPerformance test1 5000 100
> >> -1 acks=1 bootstrap.servers=192.168.1.1:9092 buffer.memory=134217728
> >> batch.size=8192
> >>
> >> And the throughput would drop to ~9MB/sec.
> >>
> >> But if I increase the message size, say 10,000 bytes per message
> >>
> >> bin/kafka-run-class.sh
> >> org.apache.kafka.clients.tools.ProducerPerformance test1 50 1
> >> -1 acks=1 bootstrap.servers=192.168.1.1:9092 buffer.memory=134217728
> >> batch.size=8192
> >>
> >> The throughput would bounce back to ~33MB/sec.
> >>
> >> I am playing with these numbers just to get a pattern as to what kind of
> >> combination
> >> would serve us the best as far as message size goes.  So it would help
> >> better if we
> >> can safely say that h

Re: Performance Test Inquiries

2015-08-31 Thread Jay Kreps
The second command you give actually doesn't seem to double the memory
(maybe just a typo?). I can't explain why doubling buffer memory would
decrease throughput. The only effect of adding memory would be if you run
out, and then running out of memory would cause you to block and hence
lower throughput. So more memory should only be able to help (or have no
effect). I wonder if something else was different between the tests?

For the second test is it possible that you are on 1 gigabit ethernet? 1
gigabit ~= 100mb once you account for the protocol overhead (TCP and
Kafka's protocol).

-Jay

On Mon, Aug 31, 2015 at 3:14 AM, explorer  wrote:

> Hi all,
>
> Since my company is considering adopting Kafka as our message bus, I
> have been assigned the task to perform some benchmark tests.  I
> basically followed what Jay wrote on this article
> <
> http://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
> >
>
> The benchmarks were set up using 4 nodes with one node acting as both
> producer and consumer while the rest function as Kafka brokers.
>
> This is the baseline (50M messages (100 bytes each) ,64MB buffer
> memory, and 8192 batch size)
>
> bin/kafka-run-class.sh
> org.apache.kafka.clients.tools.ProducerPerformance test1 5000 100
> -1 acks=1 bootstrap.servers=192.168.1.1:9092 buffer.memory=67108864
> batch.size=8192
>
> which on our setup yielded the result of
>
> 5000 records sent, 265939.057406 records/sec (25.36 MB/sec)
>
> However, by doubling the buffer.memory to 128M
>
> bin/kafka-run-class.sh
> org.apache.kafka.clients.tools.ProducerPerformance test1 50 1
> -1 acks=1 bootstrap.servers=192.168.1.1:9092 buffer.memory=67108864
> batch.size=8192
>
> The throughput dropped significantly.
>
> 5000 records sent, 93652.601295 records/sec (8.93 MB/sec)
>
> Anyone able to interpret why the throughput degraded so much?
>
> Likewise, when performing benchmarks using 3 partitions across 3
> nodes, the maximum throughput shown is roughly 33.2MB/sec, whereas a
> single partition (on a single node) yields 100MB/sec.
>
> My guess is that on a 3 nodes setup, I need to multiply the 33.2
> MB/sec reading by 3 since the the 33.2MB/sec reading only represents
> the bandwidth available to one single node.
>
> Again, anyone out there willing to shed some lights on how to
> interpret the numbers correctly?
>
> Cheers,
>
> Paul
>


Re: Performance Test Inquiries

2015-08-31 Thread Jay Kreps
In general increasing message size should increase bytes/sec throughput
since much of the work is on a per-message basis. I think the question
remains why raising the buffer size with fixed message size would drop the
throughput. Sounds like a bug if you can reproduce it consistently. Want to
file a JIRA and see if others can reproduce the same thing?

For the multi-server test I may have misread your email. When you say you
see 33MB/sec across 3 servers does that mean an aggregate of ~100MB/sec? I
was assuming yes and what you were seeing was that you were maxing out the
client's bandwidth so as you added servers each server got a smaller chunk
of the ~100MB/sec client bandwidth. Maybe that's not what you're saying,
though.

-Jay

On Mon, Aug 31, 2015 at 9:49 AM, explorer <jind...@gmail.com> wrote:

> H Jay,
>
> Thanks for the response.
>
> The second command was indeed a typo.  It should have been
>
> bin/kafka-run-class.sh
> org.apache.kafka.clients.tools.ProducerPerformance test1 5000 100
> -1 acks=1 bootstrap.servers=192.168.1.1:9092 buffer.memory=134217728
> batch.size=8192
>
> And the throughput would drop to ~9MB/sec.
>
> But if I increase the message size, say 10,000 bytes per message
>
> bin/kafka-run-class.sh
> org.apache.kafka.clients.tools.ProducerPerformance test1 50 1
> -1 acks=1 bootstrap.servers=192.168.1.1:9092 buffer.memory=134217728
> batch.size=8192
>
> The throughput would bounce back to ~33MB/sec.
>
> I am playing with these numbers just to get a pattern as to what kind of
> combination
> would serve us the best as far as message size goes.  So it would help
> better if we
> can safely say that higher buffer memory gives better performance but only
> to certain extent.
>
> But in our test context, I get to see lowered throughput with higher memory
> buffer.  But
> once I increase the message size, then the throughput seems normal again.
> This is the
> confusing point.
>
> For the second part, I am indeed on a 1 gigabit Ethernet.  I just feel
> confused why
>
> A single partition (on a single broker) test yields 100MB/sec throughput
>
> while
>
> 3 partitions on 3 brokers (all on different physical server) gave me the
> reading of 33MB/sec
>
> and to make it more clear
>
> 2 partitions on 2 brokers (on different physical server too) gave me the
> reading of 25MB/sec
>
> I just wanna know how to interpret these numbers so I can draw a pattern
> but so far this is
> not very consistent (more partitions = less throughput?)
>
> Cheers,
>
> Paul
>
>
>
> On Tue, Sep 1, 2015 at 12:09 AM, Jay Kreps <j...@confluent.io> wrote:
>
> > The second command you give actually doesn't seem to double the memory
> > (maybe just a typo?). I can't explain why doubling buffer memory would
> > decrease throughput. The only effect of adding memory would be if you run
> > out, and then running out of memory would cause you to block and hence
> > lower throughput. So more memory should only be able to help (or have no
> > effect). I wonder if something else was different between the tests?
> >
> > For the second test is it possible that you are on 1 gigabit ethernet? 1
> > gigabit ~= 100mb once you account for the protocol overhead (TCP and
> > Kafka's protocol).
> >
> > -Jay
> >
> > On Mon, Aug 31, 2015 at 3:14 AM, explorer <jind...@gmail.com> wrote:
> >
> > > Hi all,
> > >
> > > Since my company is considering adopting Kafka as our message bus, I
> > > have been assigned the task to perform some benchmark tests.  I
> > > basically followed what Jay wrote on this article
> > > <
> > >
> >
> http://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
> > > >
> > >
> > > The benchmarks were set up using 4 nodes with one node acting as both
> > > producer and consumer while the rest function as Kafka brokers.
> > >
> > > This is the baseline (50M messages (100 bytes each) ,64MB buffer
> > > memory, and 8192 batch size)
> > >
> > > bin/kafka-run-class.sh
> > > org.apache.kafka.clients.tools.ProducerPerformance test1 5000 100
> > > -1 acks=1 bootstrap.servers=192.168.1.1:9092 buffer.memory=67108864
> > > batch.size=8192
> > >
> > > which on our setup yielded the result of
> > >
> > > 5000 records sent, 265939.057406 records/sec (25.36 MB/sec)
> > >
> > > However, by doubling the buffer.memory to 128M
> > >
> > > bin/kafka-run-class.sh
> > > org.apache.kafka.clients.tools.ProducerPerformance

Re: Reduce latency

2015-08-17 Thread Jay Kreps
Yuheng,

From the command you gave it looks like you are configuring the perf test
to send data as fast as possible (the -1 for target throughput). This means
it will always queue up a bunch of unsent data until the buffer is
exhausted and then block. The larger the buffer, the bigger the queue. This
is where the latency comes from. This is exactly what you would expect and
what the buffering is supposed to do.

If you want to measure latency this test doesn't really make sense, you
need to measure with some fixed throughput. Instead of -1 enter the target
throughput you want to measure latency at (e.g. 10 records/sec).

-Jay

On Thu, Aug 13, 2015 at 12:18 PM, Yuheng Du yuheng.du.h...@gmail.com
wrote:

 Thank you Alvaro,

 How to use sync producers? I am running the standard ProducerPerformance
 test from kafka to measure the latency of each message to send from
 producer to broker only.
 The command is like bin/kafka-run-class.sh
 org.apache.kafka.clients.tools.ProducerPerformance test7 5000 100 -1
 acks=1 bootstrap.servers=esv4-hcl198.grid.linkedin.com:9092
 buffer.memory=67108864 batch.size=8196

 For running producers, where should I put the producer.type=sync
 configuration into? The config/server.properties? Also Does this mean we
 are using batch size of 1? Which version of Kafka are you using?
 thanks.

 On Thu, Aug 13, 2015 at 3:01 PM, Alvaro Gareppe agare...@gmail.com
 wrote:

  Are you measuring latency as time between producer and consumer ?
 
  In that case, the ack shouldn't affect the latency, cause even tough your
  producer is not going to wait for the ack, the consumer will only get the
  message after its commited in the server.
 
  About latency my best result occur with sync producers, but the
 throughput
  is much lower in that case.
 
  About not flushing to disk I'm pretty sure that it's not an option in
 kafka
  (correct me if I'm wrong)
 
  Regards,
  Alvaro Gareppe
 
  On Thu, Aug 13, 2015 at 12:59 PM, Yuheng Du yuheng.du.h...@gmail.com
  wrote:
 
   Also, the latency results show no major difference when using ack=0 or
   ack=1. Why is that?
  
   On Thu, Aug 13, 2015 at 11:51 AM, Yuheng Du yuheng.du.h...@gmail.com
   wrote:
  
I am running an experiment where 92 producers is publishing data
 into 6
brokers and 10 consumer are reading online data simultaneously.
   
How should I do to reduce the latency? Currently when I run the
  producer
performance test the average latency is around 10s.
   
Should I disable log.flush? How to do that? Thanks.
   
  
 
 
 
  --
  Ing. Alvaro Gareppe
  agare...@gmail.com
 



Re: Checkpointing with custom metadata

2015-08-04 Thread Jay Kreps
Hey James,

You are right the intended use of that was to have a way to capture some
very small metadata about your state at the time of offset commit in an
atomic way.

That field isn't exposed but we do need to add it to the new consumer api
(I think just no one has done it yet.

-Jay

On Mon, Aug 3, 2015 at 1:52 PM, James Cheng jch...@tivo.com wrote:

 According to
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest,
 we can store custom metadata with our checkpoints. It looks like the high
 level consumer does not support committing offsets with metadata, and that
 in order to checkpoint with custom metadata, we have to issue the
 OffsetCommitRequest ourselves. Is that correct?

 Thanks,
 -James




Re: New consumer - offset one gets in poll is not offset one is supposed to commit

2015-07-28 Thread Jay Kreps
It seems less weird if you think of the offset as the position of the
consumer, i.e. it is on record 5. In some sense the consumer is actually
in between records, i.e. if it has processed 4 and not processed 5 do you
think about your position as being on 4 or on 5? Well not on 4 because it
already processed 4, and not on 5 because it hasn't got that yet, it really
is in between. But in english it seems most natural to think of it as being
on record 5 in the same sense you'd tell your boss you're working on the
thing you're starting not the thing you most recently completed.

I do agree the advantage of your preferred approach is that the commit
offset is the last record you read. The disadvantage is that the initial
position is now -1, which is also weird.

For log compacted topics this still works fine. Your position remains 6
even if there is no longer a message with offset 6, a fetch request at
position 6 returns messages starting with the first offset after 6. This
would be necessary regardless of the offset scheme.

-Jay

On Tue, Jul 28, 2015 at 6:31 PM, tao xiao xiaotao...@gmail.com wrote:

 Correct me if I m wrong. If compaction is used +1 to indicate next offset
 is no longer valid. For the compacted section the offset is not increasing
 sequentially. i think you need to call the next offset of the last
 processed record to figure out what the next offset will be

 On Wed, 29 Jul 2015 at 06:16 Stevo Slavić ssla...@gmail.com wrote:

  Hello Jason,
 
  Thanks for reply!
 
  About your proposal, in general case it might be helpful. In my case it
  will not help much - I'm allowing each ConsumerRecord or subset of
  ConsumerRecords to be processed and ACKed independently and out of HLC
  process/thread (not to block partition), and then committing largest
  consecutive ACKed processed offset (+1) since current last committed
 offset
  per partition.
 
  Kind regards,
  Stevo Slavic.
 
  On Mon, Jul 27, 2015 at 6:52 PM, Jason Gustafson ja...@confluent.io
  wrote:
 
   Hey Stevo,
  
   I agree that it's a little unintuitive that what you are committing is
  the
   next offset that should be read from and not the one that has already
  been
   read. We're probably constrained in that we already have a consumer
 which
   implements this behavior. Would it help if we added a method on
   ConsumerRecords to get the next offset (e.g. nextOffset(partition))?
  
   Thanks,
   Jason
  
   On Fri, Jul 24, 2015 at 10:11 AM, Stevo Slavić ssla...@gmail.com
  wrote:
  
Hello Apache Kafka community,
   
Say there is only one topic with single partition and a single
 message
  on
it.
Result of calling a poll with new consumer will return ConsumerRecord
  for
that message and it will have offset of 0.
   
After processing message, current KafkaConsumer implementation
 expects
   one
to commit not offset 0 as processed, but to commit offset 1 - next
offset/position one would like to consume.
   
Does this sound strange to you as well?
   
Wondering couldn't this offset+1 handling for next position to read
  been
done in one place, in KafkaConsumer implementation or broker or
  whatever,
instead of every user of KafkaConsumer having to do it.
   
Kind regards,
Stevo Slavic.
   
  
 



Re: New producer in production

2015-07-17 Thread Jay Kreps
Hey Sivananda,

That's actually no longer true and likely a documentation bug. Where did
you see that?

-Jay

On Fri, Jul 17, 2015 at 9:35 AM, Sivananda Reddy sivananda2...@gmail.com
wrote:

 Hi,

 Kafka document ion says that the new producer is in Beta state, how safe is
 it to use the new producer in production?. This is the first time I am
 using Kafka for my application messaging needs. Please let me know.

 Thank you,
 Siva.



Re: stunning error - Request of length 1550939497 is not valid, it is larger than the maximum size of 104857600 bytes

2015-07-14 Thread Jay Kreps
This is almost certainly a client bug. Kafka's request format is size
delimited messages in the form
   4 byte size NN byte payload
If the client sends a request with an invalid size or sends a partial
request the server will see effectively random bytes from the next request
as the size of the next message and generally reject the request (or fail
to parse it).

-Jay

On Sat, Jul 11, 2015 at 9:08 PM, David Montgomery davidmontgom...@gmail.com
 wrote:

 I cant send this s simple payload using python.

 topic: topic-test-development
 payload: {utcdt: 2015-07-12T03:59:36, ghznezzhmx: apple}


 No handlers could be found for logger kafka.conn
 Traceback (most recent call last):
   File /home/ubuntu/workspace/feed-tests/tests/druid-adstar.py, line 81,
 in module
 test_send_data_to_realtimenode()
   File /home/ubuntu/workspace/feed-tests/tests/druid-adstar.py, line 38,
 in test_send_data_to_realtimenode
 response = producer.send_messages(test_topic,test_payload)
   File /usr/local/lib/python2.7/dist-packages/kafka/producer/simple.py,
 line 54, in send_messages
 topic, partition, *msg
   File /usr/local/lib/python2.7/dist-packages/kafka/producer/base.py,
 line 349, in send_messages
 return self._send_messages(topic, partition, *msg)
   File /usr/local/lib/python2.7/dist-packages/kafka/producer/base.py,
 line 390, in _send_messages
 fail_on_error=self.sync_fail_on_error
   File /usr/local/lib/python2.7/dist-packages/kafka/client.py, line 480,
 in send_produce_request
 (not fail_on_error or not self._raise_on_response_error(resp))]
   File /usr/local/lib/python2.7/dist-packages/kafka/client.py, line 247,
 in _raise_on_response_error
 raise resp
 kafka.common.FailedPayloadsError

 Here is what is in my logs
 [2015-07-12 03:29:58,103] INFO Closing socket connection to
 /xxx.xxx.xxx.xxx due to invalid request: Request of length 1550939497 is
 not valid, it is larger than the maximum size of 104857600 bytes.
 (kafka.network.Processor)



 Server is 4 gigs of ram.

 I used export KAFKA_HEAP_OPTS=-Xmx256M -Xms128M in kafka-server-start.sh

 So.why?



Re: Using Kafka as a persistent store

2015-07-10 Thread Jay Kreps
If I recall correctly, setting log.retention.ms and log.retention.bytes to
-1 disables both.

On Fri, Jul 10, 2015 at 1:55 PM, Daniel Schierbeck 
daniel.schierb...@gmail.com wrote:


  On 10. jul. 2015, at 15.16, Shayne S shaynest...@gmail.com wrote:
 
  There are two ways you can configure your topics, log compaction and with
  no cleaning. The choice depends on your use case. Are the records
 uniquely
  identifiable and will they receive updates? Then log compaction is the
 way
  to go. If they are truly read only, you can go without log compaction.

 I'd rather be free to use the key for partitioning, and the records are
 immutable — they're event records — so disabling compaction altogether
 would be preferable. How is that accomplished?
 
  We have a small processes which consume a topic and perform upserts to
 our
  various database engines. It's easy to change how it all works and simply
  consume the single source of truth again.
 
  I've written a bit about log compaction here:
  http://www.shayne.me/blog/2015/2015-06-25-everything-about-kafka-part-2/
 
  On Fri, Jul 10, 2015 at 3:46 AM, Daniel Schierbeck 
  daniel.schierb...@gmail.com wrote:
 
  I'd like to use Kafka as a persistent store – sort of as an alternative
 to
  HDFS. The idea is that I'd load the data into various other systems in
  order to solve specific needs such as full-text search, analytics,
 indexing
  by various attributes, etc. I'd like to keep a single source of truth,
  however.
 
  I'm struggling a bit to understand how I can configure a topic to retain
  messages indefinitely. I want to make sure that my data isn't deleted.
 Is
  there a guide to configuring Kafka like this?
 



Re: Kafka as an event store for Event Sourcing

2015-06-12 Thread Jay Kreps
I have been thinking a little about this. I don't think CAS actually
requires any particular broker support. Rather the two writers just write
messages with some deterministic check-and-set criteria and all the
replicas read from the log and check this criteria before applying the
write. This mechanism has the downside that it creates additional writes
when there is a conflict and requires waiting on the full roundtrip (write
and then read) but it has the advantage that it is very flexible as to the
criteria you use.

An alternative strategy for accomplishing the same thing a bit more
efficiently is to elect leaders amongst the writers themselves. This would
require broker support for single writer to avoid the possibility of split
brain. I like this approach better because the leader for a partition can
then do anything they want on their local data to make the decision of what
is committed, however the downside is that the mechanism is more involved.

-Jay

On Fri, Jun 12, 2015 at 6:43 AM, Ben Kirwin b...@kirw.in wrote:

 Gwen: Right now I'm just looking for feedback -- but yes, if folks are
 interested, I do plan to do that implementation work.

 Daniel: Yes, that's exactly right. I haven't thought much about
 per-key... it does sound useful, but the implementation seems a bit
 more involved. Want to add it to the ticket?

 On Fri, Jun 12, 2015 at 7:49 AM, Daniel Schierbeck
 daniel.schierb...@gmail.com wrote:
  Ben: your solutions seems to focus on partition-wide CAS. Have you
  considered per-key CAS? That would make the feature more useful in my
  opinion, as you'd greatly reduce the contention.
 
  On Fri, Jun 12, 2015 at 6:54 AM Gwen Shapira gshap...@cloudera.com
 wrote:
 
  Hi Ben,
 
  Thanks for creating the ticket. Having check-and-set capability will be
  sweet :)
  Are you planning to implement this yourself? Or is it just an idea for
  the community?
 
  Gwen
 
  On Thu, Jun 11, 2015 at 8:01 PM, Ben Kirwin b...@kirw.in wrote:
   As it happens, I submitted a ticket for this feature a couple days
 ago:
  
   https://issues.apache.org/jira/browse/KAFKA-2260
  
   Couldn't find any existing proposals for similar things, but it's
   certainly possible they're out there...
  
   On the other hand, I think you can solve your particular issue by
   reframing the problem: treating the messages as 'requests' or
   'commands' instead of statements of fact. In your flight-booking
   example, the log would correctly reflect that two different people
   tried to book the same flight; the stream consumer would be
   responsible for finalizing one booking, and notifying the other client
   that their request had failed. (In-browser or by email.)
  
   On Wed, Jun 10, 2015 at 5:04 AM, Daniel Schierbeck
   daniel.schierb...@gmail.com wrote:
   I've been working on an application which uses Event Sourcing, and
 I'd
  like
   to use Kafka as opposed to, say, a SQL database to store events. This
  would
   allow me to easily integrate other systems by having them read off
 the
   Kafka topics.
  
   I do have one concern, though: the consistency of the data can only
 be
   guaranteed if a command handler has a complete picture of all past
  events
   pertaining to some entity.
  
   As an example, consider an airline seat reservation system. Each
   reservation command issued by a user is rejected if the seat has
 already
   been taken. If the seat is available, a record describing the event
 is
   appended to the log. This works great when there's only one producer,
  but
   in order to scale I may need multiple producer processes. This
  introduces a
   race condition: two command handlers may simultaneously receive a
  command
   to reserver the same seat. The event log indicates that the seat is
   available, so each handler will append a reservation event – thus
   double-booking that seat!
  
   I see three ways around that issue:
   1. Don't use Kafka for this.
   2. Force a singler producer for a given flight. This will impact
   availability and make routing more complex.
   3. Have a way to do optimistic locking in Kafka.
  
   The latter idea would work either on a per-key basis or globally for
 a
   partition: when appending to a partition, the producer would
 indicate in
   its request that the request should be rejected unless the current
  offset
   of the partition is equal to x. For the per-key setup, Kafka brokers
  would
   track the offset of the latest message for each unique key, if so
   configured. This would allow the request to specify that it should be
   rejected if the offset for key k is not equal to x.
  
   This way, only one of the command handlers would succeed in writing
 to
   Kafka, thus ensuring consistency.
  
   There are different levels of complexity associated with implementing
  this
   in Kafka depending on whether the feature would work per-partition or
   per-key:
   * For the per-partition optimistic locking, the broker would just
 need
  to
   keep track of the 

Re: High CPU usage for idle kafka server

2015-06-08 Thread Jay Kreps
Could it also be that the log cleaner is running? This will definitely use
some CPU while the cleaning is occurring (it would attempt to use one cpu
per log cleaner thread you configure).

-Jay

On Mon, Jun 8, 2015 at 1:07 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 It seems the attachments are lost. But high CPU for ReplicaRetcherThread
 might be related to KAFKA-1461. Can you try to apply that patch and see if
 it solves problem?

 Thanks,

 Jiangjie (Becket) Qin

 From: pundlik.anuja pundlik.an...@gmail.commailto:
 pundlik.an...@gmail.com
 Reply-To: users@kafka.apache.orgmailto:users@kafka.apache.org 
 users@kafka.apache.orgmailto:users@kafka.apache.org
 Date: Monday, June 8, 2015 at 12:20 PM
 To: users@kafka.apache.orgmailto:users@kafka.apache.org 
 users@kafka.apache.orgmailto:users@kafka.apache.org
 Subject: Re: High CPU usage for idle kafka server

 Seems to be LogCleaner and FetchRequest threads.
 Attached is the profiling screenshot

 On Fri, Jun 5, 2015 at 3:06 PM, Jiangjie Qin j...@linkedin.com.invalid
 mailto:j...@linkedin.com.invalid wrote:
 Has this to do with KAFKA-1461?
 Can you see which thread is taking a lot of cpu? Some jconsole plugin can
 get that information.

 Jiangjie (Becket) Qin

 On 6/5/15, 2:57 PM, pundlik.anuja pundlik.an...@gmail.commailto:
 pundlik.an...@gmail.com wrote:

 Hi Jay,
 
 Good to hear from you. I met you at the kafka meetup at linkedin.
 
 - No, I am running kafka_2.11-0.8.2.1
 
 
 Are there any logs/ any info that I can provide that will help you
 understand what could be the issue?
 
 Thanks,
 Anuja
 
 On Fri, Jun 5, 2015 at 2:36 PM, Jay Kreps jay.kr...@gmail.commailto:
 jay.kr...@gmail.com wrote:
 
  This sounds a lot like a bug we fixed in 0.8.2.0, no chance you are
 running
  that pre-release version is there?
 
  -Jay
 
  On Wed, Jun 3, 2015 at 9:43 PM, Anuja Pundlik (apundlik) 
  apund...@cisco.commailto:apund...@cisco.com
   wrote:
 
   Hi,
  
   I am using Kafka 0.8.2.1.
   We have 1 zookeeper, 3 kafka brokers.
   We have 9 topics, out of which 1 topic has 18 partitions, while
 another
   has 12 partitions. All other topics have 1 partition each.
  
   We see that idle kafka brokers (not carrying any message) are using
 more
   than 50% of CPU. See top output below.
  
   Is this a known issue?
  
  
   Thanks
  
  
  
   top - 04:42:30 up  2:07,  1 user,  load average: 1.50, 1.31, 0.92
   Tasks: 177 total,   1 running, 176 sleeping,   0 stopped,   0 zombie
   Cpu(s): 13.5%us,  4.5%sy,  0.0%ni, 81.3%id,  0.2%wa,  0.0%hi,  0.1%si,
   0.4%st
   Mem:  65974296k total, 22310524k used, 43663772k free,   112688k
 buffers
   Swap:0k total,0k used,0k free, 13382460k
 cached
  
 PID USER  PR  NI  VIRT  RES  SHR S %CPU %MEMTIME+ COMMAND
9295 wae   20   0 5212m 894m  12m S   62  1.4  22:50.99 java
9323 wae   20   0 5502m 894m  12m S   56  1.4  24:28.69 java
9353 wae   20   0 5072m 896m  12m S   54  1.4  17:04.31 java
  
 





Re: High CPU usage for idle kafka server

2015-06-05 Thread Jay Kreps
This sounds a lot like a bug we fixed in 0.8.2.0, no chance you are running
that pre-release version is there?

-Jay

On Wed, Jun 3, 2015 at 9:43 PM, Anuja Pundlik (apundlik) apund...@cisco.com
 wrote:

 Hi,

 I am using Kafka 0.8.2.1.
 We have 1 zookeeper, 3 kafka brokers.
 We have 9 topics, out of which 1 topic has 18 partitions, while another
 has 12 partitions. All other topics have 1 partition each.

 We see that idle kafka brokers (not carrying any message) are using more
 than 50% of CPU. See top output below.

 Is this a known issue?


 Thanks



 top - 04:42:30 up  2:07,  1 user,  load average: 1.50, 1.31, 0.92
 Tasks: 177 total,   1 running, 176 sleeping,   0 stopped,   0 zombie
 Cpu(s): 13.5%us,  4.5%sy,  0.0%ni, 81.3%id,  0.2%wa,  0.0%hi,  0.1%si,
 0.4%st
 Mem:  65974296k total, 22310524k used, 43663772k free,   112688k buffers
 Swap:0k total,0k used,0k free, 13382460k cached

   PID USER  PR  NI  VIRT  RES  SHR S %CPU %MEMTIME+ COMMAND
  9295 wae   20   0 5212m 894m  12m S   62  1.4  22:50.99 java
  9323 wae   20   0 5502m 894m  12m S   56  1.4  24:28.69 java
  9353 wae   20   0 5072m 896m  12m S   54  1.4  17:04.31 java



Re: consumer poll returns no records unless called more than once, why?

2015-05-20 Thread Jay Kreps
Hey Ben,

The consumer actually doesn't promise to return records on any given poll()
call and even in trunk it won't return records on the first call likely.

Internally the reason is that it basically does one or two rounds of
non-blocking actions and then returns. This could include things like
communicating with the co-ordinator, establishing connections, sending
fetch requests, etc.

I guess the question is whether this behavior is confusing or not. In
general there is no guarantee that you will have data ready, or that if you
do you will be assigned a partition to consume from within your timeout. So
assuming that poll will always return data is wrong.

However with a little effort we could potentially wrap the poll call so
that rather than return it would always attempt to wait the full timeout
potentially doing multiple internal polls. This doesn't guarantee it would
return data but would reduce the likelihood when data was ready.

I'm not sure if that is actually a good idea vs just documenting this a
little better in the javadoc.

-Jay

On Wed, May 20, 2015 at 10:12 AM, Guozhang Wang wangg...@gmail.com wrote:

 Hello Ben,

 This Java consumer client was still not mature in 0.8.2.0 and lots of bug
 fixes have been checked in since then.

 I just test your code with trunk's consumer and it does not illustrate this
 problem. Could you try the same on your side and see if this issue goes
 away?

 Guozhang

 On Wed, May 20, 2015 at 9:49 AM, Padgett, Ben bpadg...@illumina.com
 wrote:

  I am using Kafka v0.8.2.0
  
  From: Guozhang Wang [wangg...@gmail.com]
  Sent: Wednesday, May 20, 2015 9:41 AM
  To: users@kafka.apache.org
  Subject: Re: consumer poll returns no records unless called more than
  once, why?
 
  Hello Ben,
 
  Which version of Kafka are you using with this consumer client?
 
  Guozhang
 
  On Wed, May 20, 2015 at 9:03 AM, Padgett, Ben bpadg...@illumina.com
  wrote:
 
   //this code
  
   Properties consumerProps = new Properties();
   consumerProps.put(bootstrap.servers, localhost:9092);
  
  
   //without deserializer it fails, which makes sense. the
   documentation however doesn't show this
   consumerProps.put(key.deserializer,
   org.apache.kafka.common.serialization.StringDeserializer);
   consumerProps.put(value.deserializer,
   org.apache.kafka.common.serialization.StringDeserializer);
  
  
   //why is serializer required? without this it fails to return
   results when calling poll
   consumerProps.put(key.serializer,
   org.apache.kafka.common.serializers.StringSerializer);
   consumerProps.put(value.serializer,
   org.apache.kafka.common.serializers.StringSerializer);
  
  
   consumerProps.put(group.id, test);
   consumerProps.put(enable.auto.commit, true);
   consumerProps.put(auto.commit.interval.ms, 1000);
   consumerProps.put(session.timeout.ms, 3);
  
   org.apache.kafka.clients.consumer.KafkaConsumerString, String
   consumer = new org.apache.kafka.clients.consumer.KafkaConsumerString,
   String(consumerProps);
  
   TopicPartition topicPartition = new
   TopicPartition(project-created, 0);
   consumer.subscribe(topicPartition);
  
   consumer.seeekToBeginning(topicPartition);
  
   //each scenerio code goes here
  
  
  
   I have a scenerio where it returns records and a scenerio where
  no
   records are returned. Could anyone provide insight on why this happens?
  
  
  
   //without a loop consumer.poll(100) returns no records
   //after poll is called a second time it returns records
   boolean run = true;
   while (run) {
   ConsumerRecordsString, String records =
 consumer.poll(100);
   }
  
  
  
   //why would this return zero records?
   ConsumerRecordsString, String records = consumer.poll(100);
  
  
   //This is to show that there are records for topic project-created
  
   bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic
   project-created --from-beginning
   3a543cf4-13e4-42b6-8c72-54b733228c75
   c298d41e-4ae5-4f4f-933e-f745969aaf98
   2036ef69-9694-4ee8-a40c-50cddf982edb
   08d90698-8c75-4c4b-8ce6-b0b12d3de0d6
   249d0e8c-cd38-41b0-ae7f-bdfa7fa8f76e
   84d87e45-cd10-4797-bfe3-d540e26e56cc
   a8c74f78-cb7a-4c74-873b-a0e587af646c
   42eeff6e-22fd-40c4-96b4-91a3df356ec7
   bef622b6-3ac1-489b-9837-ab9c2b0399d1
   03e9d567-fadf-46dc-8097-4bc327a0942e
   df5d93d6-30de-494a-8d82-45cd8b4fc785
   eb5a194f-083d-4a0f-bbb7-155056acd929
   192dfbb7-4f4f-4550-a1b4-ab9bc83be825
   b7845e7e-b477-476d-b115-b0a9d076c52e
   f8ea2d0f-bc76-44a8-86ef-2b3e0adf7755
   8ee28e0c-5a8e-47c8-a939-f5dc6d7be3f9
   fbe30ae3-c383-4e27-8f70-f3c63ed82fed
   50fa9166-cc0d-4d12-8d86-c922062519cd
   50ceb437-2556-4a9a-8a13-93bfc130e914
   f1f5ad0b-7739-47fd-be57-ac093f728004
   

Re: OutOfMemory error on broker when rolling logs

2015-05-13 Thread Jay Kreps
I think java.lang.OutOfMemoryError: Map failed has usually been out of
address space for mmap if memory serves.

If you sum the length of all .index files while the service is running (not
after stopped), do they sum to something really close to 2GB? If so it is
likely either that the OS/arch is 32 bit (which on slack you said it
wasn't) or possibly the jvm is in 32 bit mode?

If you want to debug easiest test would be a simple program that did
something like:

public void static main(String[] args) throws Exception {
  RandomAccessFile raf = new RandomAccessFile(test-file-1, rw);
  RandomAccessFile raf2 = new RandomAccessFile(test-file-2, rw);
  raf1.setLength(2*1024*1024*1024);
  raf2.setLength(2*1024*1024*1024);
  MappedByteBuffer b1 = raf1.getChannel.map(FileChannel.MapMode.READ_WRITE,
0, 2*1024*1024*1024);
  MappedByteBuffer b2 = raf2.getChannel.map(FileChannel.MapMode.READ_WRITE,
0, 2*1024*1024*1024);
}

If you compile this and run with the same options you're running kafka with
it should succeed but if it fails with the same error that is the address
space limit for 32 bits kicking in.

-Jay

On Wed, May 13, 2015 at 4:24 PM, Jeff Field jvfi...@blizzard.com wrote:

 Hello,
 We are doing a Kafka POC on our CDH cluster. We are running 3 brokers with
 24TB (48TB Raw) of available RAID10 storage (XFS filesystem mounted with
 nobarrier/largeio) (HP Smart Array P420i for the controller, latest
 firmware) and 48GB of RAM. The broker is running with -Xmx4G -Xms4G
 -server -XX:+UseParNewGC -XX:+UseConcMarkSweepGC
 -XX:+CMSClassUnloadingEnabled -XX:+CMSScavengeBeforeRemark
 -XX:+DisableExplicitGC. This is on RHEL 6.6 with the
 2.6.32-504.8.1.el6.x86_64 kernel. JDK is jdk1.7.0_67 64-bit. We were using
 the 1.2.0 version of the Cloudera Kafka 0.8.2.0 build. We are upgrading to
 1.3.0 after the RAID testing, but none of the fixes they included in 1.3.0
 seem to be related to what we're seeing.

 We are using a custom producer to push copies of real messages from our
 existing messaging system onto Kafka in order to test ingestion rates and
 compression ratios. After a couple of hours (during which about 4.3
 billion, ~2.2 terabytes before replication), one of our brokers will fail
 with an I/O error (2 slightly different ones so far) followed by a memory
 error. We're currently doing stress testing on the arrays (write/verify
 with IOzone set for 24 threads), but assuming the tests don't find anything
 on IO, what could cause this? Errors are included below.

 Thanks,
 -Jeff

 Occurrence 1:
 2015-05-12 03:55:08,291 FATAL kafka.server.KafkaApis: [KafkaApi-834]
 Halting due to unrecoverable I/O error while handling produce request:
 kafka.common.KafkaStorageException: I/O exception in append to log
 'TEST_TOPIC-1'
 at kafka.log.Log.append(Log.scala:266)
 at
 kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379)
 at
 kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:365)
 at kafka.utils.Utils$.inLock(Utils.scala:561)
 at kafka.utils.Utils$.inReadLock(Utils.scala:567)
 at
 kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365)
 at
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:291)
at
 kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:282)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at
 scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
 at
 scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
 at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:282)
 at
 kafka.server.KafkaApis.handleProducerOrOffsetCommitRequest(KafkaApis.scala:204)
 at kafka.server.KafkaApis.handle(KafkaApis.scala:59)
 at
 kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.io.IOException: Map failed
 at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:888)
 at kafka.log.OffsetIndex.init(OffsetIndex.scala:74)
 at kafka.log.LogSegment.init(LogSegment.scala:57)
 at kafka.log.Log.roll(Log.scala:565)
 at kafka.log.Log.maybeRoll(Log.scala:539)
 at kafka.log.Log.append(Log.scala:306)
 ... 21 more
 Caused by: java.lang.OutOfMemoryError: Map failed
 at sun.nio.ch.FileChannelImpl.map0(Native 

Re: Experiences testing new producer performance across multiple threads/producer counts

2015-05-13 Thread Jay Kreps
Hey Garry,

Super interesting. We honestly never did a ton of performance tuning on the
producer. I checked the profiles early on in development and we fixed a few
issues that popped up in deployment, but I don't think anyone has done a
really scientific look. If you (or anyone else) want to dive into things I
suspect it could be improved.

Becket is exactly right. There are two possible bottlenecks you can hit in
the producer--the single background sender thread and the per-partition
lock. You can check utilization on the background thread with jvisualvm
(it's named something like kafka-producer-network-thread). The locking is
fairly hard to improve.

It's a little surprising that adding partitions caused a large decrease in
performance. Generally this is only the case if you override the flush
settings on the broker to force fsync more frequently.

The ISR issues under heavy load are probably fixable, the issue is
discussed a bit here:
http://blog.confluent.io/2015/04/07/hands-free-kafka-replication-a-lesson-in-operational-simplicity/

The producer settings that may matter for performance are:
acks
batch.size (though beyond 32k I didn't see much improvement)
linger.ms (setting = 1 may help a bit)
send.buffer.bytes (maybe, but probably not)

Cheers,

-Jay

On Wed, May 13, 2015 at 3:42 PM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Thanks for sharing this, Garry. I actually did similar tests before but
 unfortunately lost the test data because my laptop rebooted and I forgot
 to save the dataŠ

 Anyway, several things to verify:

 1. Remember KafkaProducer holds lock per partition. So if you have only
 one partition in the target topic and many application threads. Lock
 contention could be an issue.

 2. It matters that how frequent the sender thread wake up and runs. You
 can take a look at the following sensors to further verify whether the
 sender thread really become a bottleneck or not:
 Select-rate
 Io-wait-time-ns-avg
 Io-time-ns-avg

 3. Batch size matters, so take a look at the sensor batch-size-avg and see
 if the average batch size makes sense or not.

 Looking forward to your further profiling. My thinking is that unless you
 are sending very small messages to a small number of partitions. You don¹t
 need to worry about to use more than one producer.

 Thanks.

 Jiangjie (Becket) Qin



 On 5/13/15, 2:40 PM, Garry Turkington g.turking...@improvedigital.com
 wrote:

 Hi,
 
 I talked with Gwen at Strata last week and promised to share some of my
 experiences benchmarking an app reliant on the new  producer. I'm using
 relatively meaty boxes running my producer code (24 core/64GB RAM) but I
 wasn't pushing them until I got them on the same 10GB fabric as the Kafka
 cluster they are using (saturating the prior 1GB NICs was just too easy).
 There are 5 brokers, 24 core/192GB RAM/8*2TB disks, running 0.8.2.1.
 
 With lots of cores and a dedicated box the question was then how to
 deploy my application. In particular how many worker threads and how many
 instances of the KafkaProducer  to share amongst them. I also wanted to
 see how things would change as I scale up the thread count.
 
 I ripped out the data retrieval part of my app (it reads from S3) and
 instead replaced it with some code to produce random records of average
 size 500 bytes but varying between 250 and 750. I started the app
 running, ignored the first 25m messages then measured the timing for the
 next 100m and  calculated the average messages/sec written to Kafka
 across that run.
 
 Starting small I created 4 application threads with a range of approaches
 to sharing KafkaProducer instances. The records written to the Kafka
 cluster per second were as follows:
 
 4 threads all sharing 1 client: 332K
 4 threads sharing 2 clients: 333K
 4 threads, dedicated client per thread: 310K
 
 Note that when I had 2 KafkaProducer clients as in the second line above
 each was used by 2 threads. Similar below, number of threads/number of
 clients is the max number of threads per KafkaProducer instance.
 
 As can be seen from the above there's not much in it. Scaling up to 8
 application threads the numbers  looked like:
 
 8 threads sharing 1 client: 387K
 8 threads sharing 2 clients: 609K
 8 threads sharing 4 clients: 628K
 8 threads with dedicated  client per thread: 527K
 
 This time sharing a single producer client  across all threads has by far
 the worse performance and  isn't much better than when using 4
 application threads. The 2 and 4 client options are much better and are
 in the ballpark of 2x the 4 thread performance. A dedicated client per
 thread isn't quite as good but isn't so far off to be unusable. So then
 taking it to 16 application threads:
 
 16 threads sharing 1 client: 380K
 16 threads sharing 2 clients: 675K
 16 threads sharing 4 clients: 869K
 16 threads sharing 8 clients: 733K
 16 threads  with a dedicated client per thread: 491K
 
 This gives a much clearer performance curve. The 16 thread/4 producer
 

Re: circuit breaker for producer

2015-05-05 Thread Jay Kreps
Does block.on.buffer.full=false do what you want?

-Jay

On Tue, May 5, 2015 at 1:59 AM, mete efk...@gmail.com wrote:

 Hello Folks,

 I was looking through the kafka.producer metrics on the JMX interface, to
 find a good indicator when to trip the circuit. So far it seems like the
 bufferpool-wait-ratio metric is a useful decision mechanism when to cut
 off the production to kafka.

 As far as i experienced, when kafka server slow for some reason, requests
 start piling up on the producer queue and if you are not willing to drop
 any messages on the producer, send method starts blocking because of the
 slow responsiveness.

 So this buffer pool wait ratio starts going up from 0.x up to 1.0. And i am
 thinking about tripping the circuit breaker using this metric, ex: if
 wait-ratio  0.90 etc...

 What do you think? Do you think there would be a better indicator to check
 the health overall?

 Best
 Mete



Re: New Producer API - batched sync mode support

2015-05-02 Thread Jay Kreps
Gwen, I don't care what anyone says I think we are totally stlone cold
slobar. :-)

I think the only caution I would have is that in general people ask for
many things and yet the systems we all admire tend to keep their surface
area really really simple. My observation is that never in the history of
working on open source has anyone ever asked for simplicity or agitated for
removing features, but people really do value that. So I think it is worth
trying to really get down to the core problem that the api solves and avoid
adding to it unless there is a really clear case.

Here are the things I have understood:

a. The performance of batching manually could be better due to locking
around a batch. This is possible, but I think it would be good to do a
quick measurement between the new and old producer and see if this really
plays out or not and the magnitude of the performance improvement we could
achieve. There were several other perf arguments aside from locking that
seemed unlikely to me, but I think a quick measurement could clear all this
up.

b. It would be nice to have some batch-level atomicity. I agree, but I
think this is the cross-partition transaction work. Batching can't really
guarantee this (and didn't before) and I think this is one where getting
almost what you need (but not quite working) is worse than nothing cause
you can't depend on it.

c. The code for looping over responses is annoying. I think this is true,
but I think if you want to give back the offset and error per message you
kind of end up with something like the futures. You could imagine some api
that sends a list of messages and returns a Map of errors or something but
it is a little special purpose since if you don't care about the error you
don't need any special api and if you care about the offset that won't
help...so I think to do this really well we need to maybe write down the N
patterns of producer usage and see which ones we can improve with a new api.

For what it is worth I think a lot of this is just because people were used
to the scala API. However the scala api also caused endless confusion
because of the weird mixture of manual and automatic batching (what is the
difference? when to use one or the other? how do they interact? etc.).

-Jay

On Wed, Apr 29, 2015 at 6:08 PM, Gwen Shapira gshap...@cloudera.com wrote:

 I'm starting to think that the old adage If two people say you are drunk,
 lie down applies here :)

 Current API seems perfectly clear, useful and logical to everyone who wrote
 it... but we are getting multiple users asking for the old batch behavior
 back.
 One reason to get it back is to make upgrades easier - people won't need to
 rethink their existing logic if they get an API with the same behavior in
 the new producer. The other reason is what Ewen mentioned earlier - if
 everyone re-implements Joel's logic, we can provide something for that.

 How about getting the old batch send behavior back by adding a new API
 with:
 public void batchSend(ListProducerRecordK,V)

 With this implementation (mixes the old behavior with Joel's snippet):
 * send records one by one
 * flush
 * iterate on futures and get them
 * log a detailed message on each error
 * throw an exception if any send failed.

 It reproduces the old behavior - which apparently everyone really liked,
 and I don't think it is overly weird. It is very limited, but anyone who
 needs more control over his sends already have plenty of options.

 Thoughts?

 Gwen




 On Tue, Apr 28, 2015 at 5:29 PM, Jay Kreps jay.kr...@gmail.com wrote:

  Hey guys,
 
  The locking argument is correct for very small records ( 50 bytes),
  batching will help here because for small records locking becomes the big
  bottleneck. I think these use cases are rare but not unreasonable.
 
  Overall I'd emphasize that the new producer is way faster at virtually
 all
  use cases. If there is a use case where that isn't true, let's look at it
  in a data driven way by comparing the old producer to the new producer
 and
  looking for any areas where things got worse.
 
  I suspect the reducing allocations argument to be not a big thing. We
 do
  a number of small per-message allocations and it didn't seem to have much
  impact. I do think there are a couple of big producer memory
 optimizations
  we could do by reusing the arrays in the accumulator in the serialization
  of the request but I don't think this is one of them.
 
  I'd be skeptical of any api that was too weird--i.e. introduces a new way
  of partitioning, gives back errors on a per-partition rather than per
  message basis (given that partitioning is transparent this is really hard
  to think about), etc. Bad apis end up causing a ton of churn and just
 don't
  end up being a good long term commitment as we change how the underlying
  code works over time (i.e. we hyper optimize for something then have to
  maintain some super weird api as it becomes hyper unoptimized for the
  client over time

Re: New Producer API - batched sync mode support

2015-04-28 Thread Jay Kreps
Hey guys,

The locking argument is correct for very small records ( 50 bytes),
batching will help here because for small records locking becomes the big
bottleneck. I think these use cases are rare but not unreasonable.

Overall I'd emphasize that the new producer is way faster at virtually all
use cases. If there is a use case where that isn't true, let's look at it
in a data driven way by comparing the old producer to the new producer and
looking for any areas where things got worse.

I suspect the reducing allocations argument to be not a big thing. We do
a number of small per-message allocations and it didn't seem to have much
impact. I do think there are a couple of big producer memory optimizations
we could do by reusing the arrays in the accumulator in the serialization
of the request but I don't think this is one of them.

I'd be skeptical of any api that was too weird--i.e. introduces a new way
of partitioning, gives back errors on a per-partition rather than per
message basis (given that partitioning is transparent this is really hard
to think about), etc. Bad apis end up causing a ton of churn and just don't
end up being a good long term commitment as we change how the underlying
code works over time (i.e. we hyper optimize for something then have to
maintain some super weird api as it becomes hyper unoptimized for the
client over time).

Roshan--Flush works as you would hope, it blocks on the completion of all
outstanding requests. Calling get on the future for the request gives you
the associated error code back. Flush doesn't throw any exceptions because
waiting for requests to complete doesn't error, the individual requests
fail or succeed which is always reported with each request.

Ivan--The batches you send in the scala producer today actually aren't
truely atomic, they just get sent in a single request.

One tricky problem to solve when user's do batching is size limits on
requests. This can be very hard to manage since predicting the serialized
size of a bunch of java objects is not always obvious. This was repeatedly
a problem before.

-Jay

On Tue, Apr 28, 2015 at 4:51 PM, Ivan Balashov ibalas...@gmail.com wrote:

 I must agree with @Roshan – it's hard to imagine anything more intuitive
 and easy to use for atomic batching as old sync batch api. Also, it's fast.
 Coupled with a separate instance of producer per
 broker:port:topic:partition it works very well. I would be glad if it finds
 its way into new producer api.

 On a side-side-side note, could anyone confirm/deny if SimpleConsumer's
 fetchSize must be set at least as batch bytes (before or after
 compression), otherwise client risks not getting any messages?



Re: New Java Producer: Single Producer vs multiple Producers

2015-04-27 Thread Jay Kreps
Hey Jiangjie,

Yeah, not sure the bottleneck. It maybe the sender or lock contention on
the writer threads. You could use top or one of the java tools to check out
the per-thread cpu usage.

The benchmarking I had done previously showed an ability to max out the 1G
network cards we had with a single thread down to pretty small messages. So
I guess there is really no reason in that scenario to have lots of
producers. I think repeating the benchmarking and optimization process on
10g would be a useful exercise.

The reason for generally recommending just one producer instance is that
generally better batching actually makes both the producer and the broker
faster. So having a bunch of under-used producers tends to lead to small
requests which isn't great. But this isn't a hard and fast rule, I strongly
suspect it's possible to max out lock contention on the accumulator as well
as the sender thread (not sure what happens first).

The new producer is pretty new still so I suspect there is a fair amount of
low-hanging performance work for anyone who wanted to take a shot at it.

-Jay

On Mon, Apr 27, 2015 at 10:55 AM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hi Jay,

 Does o.a.k.clients.tools.ProducerPerformance provide multi-thread test? I
 did not find it.

 I tweaked the test a little bit to make it multi-threaded and what I found
 is that in a single thread case, with each message of 10 bytes, single
 caller thread has ~2M messages/second throughput. 2 threads gives ~1.2 M
 messages/second each, 10 threads gives ~0.11M messages/second each. My
 topic has 8 partitions.

 It looks in my test, the bottleneck is actually the sender thread (not
 network bandwidth) instead of number of caller threads because I'm sending
 really small uncompressed messages. In this case, you do need more than
 one producer.

 From what I understand, the main reasons we recommend to share a producer
 are:
 1. We have a per partition lock in the producer, so lock contention should
 not be an issue assuming there are many partitions to send data to.
 2. The caller threads usually are much slower than sender thread because
 they have to do the compression.

 So I guess in general cases sharing a producer would provide good
 performance and save memory footprint. But if sender thread becomes a
 bottleneck, it is time to have more producers.

 Please correct me if I miss something.

 Thanks.

 Jiangjie (Becket) Qin

 On 4/24/15, 3:23 PM, Jay Kreps jay.kr...@gmail.com wrote:

 That should work. I recommend using the performance tool cited in the blog
 linked from the performance page of the website. That tool is more
 accurate and uses the new producer.
 
 On Fri, Apr 24, 2015 at 2:29 PM, Roshan Naik ros...@hortonworks.com
 wrote:
 
  Can we use the new 0.8.2 producer perf tool against a 0.8.1 broker ?
  -roshan
 
 
  On 4/24/15 1:19 PM, Jay Kreps jay.kr...@gmail.com wrote:
 
  Do make sure if you are at all performance sensitive you are using the
 new
  producer api we released in 0.8.2.
  
  -Jay
  
  On Fri, Apr 24, 2015 at 12:46 PM, Roshan Naik ros...@hortonworks.com
  wrote:
  
   Yes, I too notice the same behavior (with producer/consumer perf
 tool on
   8.1.2) Š adding more threads indeed improved the perf a lot (both
 with
  and
   without --sync). in --sync mode
 batch size made almost no diff, larger events improved the perf.
  
   I was doing some 8.1.2 perf testing with a 1 node broker setup
  (machine:
   32 cpu cores, 256gb RAM, 10gig ethernet, 1 x 15000rpm disks,).
  
   My observations:
  
  
  
   ASYNC MODE:
  
  
  
  
  
  
  
  
  
  
   Partition Count: large improvement when going from 1 to 2, beyond 2
 see
  a
   slight dip
  
  
  
  
  
  
 Number of producer threads: perf much better than sync mode with 1
   thread, perf peaks out with ~10 threads, beyond 10 thds perf impacted
   negatively
  
  
  
   SYNC MODE (does not seem to use batch size)
   Batch Size: little to no impact
   Event Size: perf scales linearly with event size
   Number of producer threads: poor perf with one thread, improves with
  more
   threads,peaks around 30 to 50 threads
   socket.send.buffer.bytes : increasing it Made a small but measurable
   difference (~4%)
  
  
   --SYNC mode was much slower.
  
  
   I modified the producer perf tool to use the scala batched producer
 api
   (not available in v8.2) --sync mode and perf of --sync mode was
 closer
  to
   async mode.
  
  
   -roshan
  
  
  
   On 4/24/15 11:42 AM, Navneet Gupta (Tech - BLR)
   navneet.gu...@flipkart.com wrote:
  
   Hi,
   
   I ran some tests on our cluster by sending message from multiple
  clients
   (machines). Each machine had about 40-100 threads per producer.
   
   I thought of trying out having multiple producers per clients with
 each
   producer receiving messages from say 10-15 threads. I actually did
 see
  an
   increase in throughput in this case. It was not one off cases but a
   repeatable phenomenon. I called threads

Re: New and old producers partition messages differently

2015-04-27 Thread Jay Kreps
Yeah I agree we could have handled this better. I think the story we have
now is that you can override it using the partition argument in the
producer (and when we get the patch for pluggable producer we can bundle a
LegacyPartitioner or something like that).

The reason for murmur2 over 3 was that it had a good single-class java
implementation. The only mumur 3 impl I could find was extremely complex
and hard to bundle, and I really wanted to avoid depending on something
like Guava which ends up being kind of a nightmare from a dependency mgmt
perspective for client libs.

-Jay

On Sun, Apr 26, 2015 at 9:03 PM, Gwen Shapira gshap...@cloudera.com wrote:

 Definitely +1 for advertising this in the docs.

 What I can't figure out is the upgrade path... if my application assumes
 that all data for a single user is in one partition (so it subscribes to a
 single partition and expects everything about a specific subset of users to
 be in that partition), this assumption will not survive an upgrade to
 0.8.2.X.  I think the assumption of stable hash partitions even after
 upgrades is pretty reasonable (i.e. I made it about gazillion times without
 thinking twice). Note that in this story my app wasn't even upgraded - it
 broke because a producer upgraded to a new API.

 If we advertise: upgrading to the new producer API may break consumers,
 we may need to offer a work-around to allow people to upgrade producers
 anyway.
 Perhaps we can say wait for Sriharsha's partitioner patch and write a
 custom partitioner that uses hashcode().

 Gwen



 On Sun, Apr 26, 2015 at 7:57 AM, Jay Kreps jay.kr...@gmail.com wrote:

  This was actually intentional.
 
  The problem with relying on hashCode is that
  (1) it is often a very bad hash function,
  (2) it is not guaranteed to be consistent from run to run (i.e. if you
  restart the jvm the value of hashing the same key can change!),
  (3) it is not available outside the jvm so non-java producers can't use
 the
  same function.
 
  In general at the moment different producers don't use the same hash code
  so I think this is not quite as bad as it sounds. Though it would be good
  to standardize things.
 
  I think the most obvious thing we could do here would be to do a much
  better job of advertising this in the docs, though, so people don't get
  bitten by it.
 
  -Jay
 
  On Fri, Apr 24, 2015 at 5:48 PM, James Cheng jch...@tivo.com wrote:
 
   Hi,
  
   I was playing with the new producer in 0.8.2.1 using partition keys
   (semantic partitioning I believe is the phrase?). I noticed that the
   default partitioner in 0.8.2.1 does not partition items the same way as
  the
   old 0.8.1.1 default partitioner was doing. For a test item, the old
   producer was sending it to partition 0, whereas the new producer was
   sending it to partition 4.
  
   Digging in the code, it appears that the partitioning logic is
 different
   between the old and new producers. Both of them hash the key, but they
  use
   different hashing algorithms.
  
   Old partitioner:
   ./core/src/main/scala/kafka/producer/DefaultPartitioner.scala:
  
 def partition(key: Any, numPartitions: Int): Int = {
   Utils.abs(key.hashCode) % numPartitions
 }
  
   New partitioner:
  
  
 
 ./clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java:
  
   } else {
   // hash the key to choose a partition
   return Utils.abs(Utils.murmur2(record.key())) %
  numPartitions;
   }
  
   Where murmur2 is a custom hashing algorithm. (I'm assuming that murmur2
   isn't the same logic as hashCode, especially since hashCode is
   overrideable).
  
   Was it intentional that the hashing algorithm would change between the
  old
   and new producer? If so, was this documented? I don't know if anyone
 was
   relying on the old default partitioner, as opposed to going round-robin
  or
   using their own custom partitioner. Do you expect it to change in the
   future? I'm guessing that one of the main reasons to have a custom
  hashing
   algorithm is so that you are full control of the partitioning and can
  keep
   it stable (as opposed to being reliant on hashCode()).
  
   Thanks,
   -James
  
  
 



Re: New and old producers partition messages differently

2015-04-26 Thread Jay Kreps
This was actually intentional.

The problem with relying on hashCode is that
(1) it is often a very bad hash function,
(2) it is not guaranteed to be consistent from run to run (i.e. if you
restart the jvm the value of hashing the same key can change!),
(3) it is not available outside the jvm so non-java producers can't use the
same function.

In general at the moment different producers don't use the same hash code
so I think this is not quite as bad as it sounds. Though it would be good
to standardize things.

I think the most obvious thing we could do here would be to do a much
better job of advertising this in the docs, though, so people don't get
bitten by it.

-Jay

On Fri, Apr 24, 2015 at 5:48 PM, James Cheng jch...@tivo.com wrote:

 Hi,

 I was playing with the new producer in 0.8.2.1 using partition keys
 (semantic partitioning I believe is the phrase?). I noticed that the
 default partitioner in 0.8.2.1 does not partition items the same way as the
 old 0.8.1.1 default partitioner was doing. For a test item, the old
 producer was sending it to partition 0, whereas the new producer was
 sending it to partition 4.

 Digging in the code, it appears that the partitioning logic is different
 between the old and new producers. Both of them hash the key, but they use
 different hashing algorithms.

 Old partitioner:
 ./core/src/main/scala/kafka/producer/DefaultPartitioner.scala:

   def partition(key: Any, numPartitions: Int): Int = {
 Utils.abs(key.hashCode) % numPartitions
   }

 New partitioner:

 ./clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java:

 } else {
 // hash the key to choose a partition
 return Utils.abs(Utils.murmur2(record.key())) % numPartitions;
 }

 Where murmur2 is a custom hashing algorithm. (I'm assuming that murmur2
 isn't the same logic as hashCode, especially since hashCode is
 overrideable).

 Was it intentional that the hashing algorithm would change between the old
 and new producer? If so, was this documented? I don't know if anyone was
 relying on the old default partitioner, as opposed to going round-robin or
 using their own custom partitioner. Do you expect it to change in the
 future? I'm guessing that one of the main reasons to have a custom hashing
 algorithm is so that you are full control of the partitioning and can keep
 it stable (as opposed to being reliant on hashCode()).

 Thanks,
 -James




Re: New Java Producer: Single Producer vs multiple Producers

2015-04-24 Thread Jay Kreps
That should work. I recommend using the performance tool cited in the blog
linked from the performance page of the website. That tool is more
accurate and uses the new producer.

On Fri, Apr 24, 2015 at 2:29 PM, Roshan Naik ros...@hortonworks.com wrote:

 Can we use the new 0.8.2 producer perf tool against a 0.8.1 broker ?
 -roshan


 On 4/24/15 1:19 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Do make sure if you are at all performance sensitive you are using the new
 producer api we released in 0.8.2.
 
 -Jay
 
 On Fri, Apr 24, 2015 at 12:46 PM, Roshan Naik ros...@hortonworks.com
 wrote:
 
  Yes, I too notice the same behavior (with producer/consumer perf tool on
  8.1.2) Š adding more threads indeed improved the perf a lot (both with
 and
  without --sync). in --sync mode
batch size made almost no diff, larger events improved the perf.
 
  I was doing some 8.1.2 perf testing with a 1 node broker setup
 (machine:
  32 cpu cores, 256gb RAM, 10gig ethernet, 1 x 15000rpm disks,).
 
  My observations:
 
 
 
  ASYNC MODE:
 
 
 
 
 
 
 
 
 
 
  Partition Count: large improvement when going from 1 to 2, beyond 2 see
 a
  slight dip
 
 
 
 
 
 
Number of producer threads: perf much better than sync mode with 1
  thread, perf peaks out with ~10 threads, beyond 10 thds perf impacted
  negatively
 
 
 
  SYNC MODE (does not seem to use batch size)
  Batch Size: little to no impact
  Event Size: perf scales linearly with event size
  Number of producer threads: poor perf with one thread, improves with
 more
  threads,peaks around 30 to 50 threads
  socket.send.buffer.bytes : increasing it Made a small but measurable
  difference (~4%)
 
 
  --SYNC mode was much slower.
 
 
  I modified the producer perf tool to use the scala batched producer api
  (not available in v8.2) --sync mode and perf of --sync mode was closer
 to
  async mode.
 
 
  -roshan
 
 
 
  On 4/24/15 11:42 AM, Navneet Gupta (Tech - BLR)
  navneet.gu...@flipkart.com wrote:
 
  Hi,
  
  I ran some tests on our cluster by sending message from multiple
 clients
  (machines). Each machine had about 40-100 threads per producer.
  
  I thought of trying out having multiple producers per clients with each
  producer receiving messages from say 10-15 threads. I actually did see
 an
  increase in throughput in this case. It was not one off cases but a
  repeatable phenomenon. I called threads to producer ratio
 sharingFactor in
  my code.
  
  I am not planning to use it this way in our clients sending messages to
  Kafka but it did go against the suggestion to have single producer
 across
  all threads.
  
  
  
  On Fri, Apr 24, 2015 at 10:27 PM, Manikumar Reddy
 ku...@nmsworks.co.in
  wrote:
  
   Hi Jay,
  
   Yes, we are producing from single process/jvm.
  
   From docs The producer will attempt to batch records together into
  fewer
   requests whenever multiple records are being sent to the same
  partition.
  
   If I understand correctly, batching happens at topic/partition level,
  not
   at Node level. right?
  
   If yes, then  both (single producer for all topics , separate
 producer
  for
   each topic) approaches
   may give similar performance.
  
   On Fri, Apr 24, 2015 at 9:29 PM, Jay Kreps jay.kr...@gmail.com
 wrote:
  
If you are talking about within a single process, having one
 producer
  is
generally the fastest because batching dramatically reduces the
  number of
requests (esp using the new java producer).
-Jay
   
On Fri, Apr 24, 2015 at 4:54 AM, Manikumar Reddy 
manikumar.re...@gmail.com
wrote:
   
 We have a 2 node cluster with 100 topics.
 should we use a single producer for all topics or  create
 multiple
 producers?
 What is the best choice w.r.t network load/failures, node
 failures,
 latency, locks?

 Regards,
 Manikumar

   
  
  
  
  
  --
  Thanks  Regards,
  Navneet Gupta
 
 




Re: New Java Producer: Single Producer vs multiple Producers

2015-04-24 Thread Jay Kreps
If you are talking about within a single process, having one producer is
generally the fastest because batching dramatically reduces the number of
requests (esp using the new java producer).
-Jay

On Fri, Apr 24, 2015 at 4:54 AM, Manikumar Reddy manikumar.re...@gmail.com
wrote:

 We have a 2 node cluster with 100 topics.
 should we use a single producer for all topics or  create multiple
 producers?
 What is the best choice w.r.t network load/failures, node failures,
 latency, locks?

 Regards,
 Manikumar



Re: New Java Producer: Single Producer vs multiple Producers

2015-04-24 Thread Jay Kreps
Do make sure if you are at all performance sensitive you are using the new
producer api we released in 0.8.2.

-Jay

On Fri, Apr 24, 2015 at 12:46 PM, Roshan Naik ros...@hortonworks.com
wrote:

 Yes, I too notice the same behavior (with producer/consumer perf tool on
 8.1.2) Š adding more threads indeed improved the perf a lot (both with and
 without --sync). in --sync mode
   batch size made almost no diff, larger events improved the perf.

 I was doing some 8.1.2 perf testing with a 1 node broker setup  (machine:
 32 cpu cores, 256gb RAM, 10gig ethernet, 1 x 15000rpm disks,).

 My observations:



 ASYNC MODE:










 Partition Count: large improvement when going from 1 to 2, beyond 2 see a
 slight dip






   Number of producer threads: perf much better than sync mode with 1
 thread, perf peaks out with ~10 threads, beyond 10 thds perf impacted
 negatively



 SYNC MODE (does not seem to use batch size)
 Batch Size: little to no impact
 Event Size: perf scales linearly with event size
 Number of producer threads: poor perf with one thread, improves with more
 threads,peaks around 30 to 50 threads
 socket.send.buffer.bytes : increasing it Made a small but measurable
 difference (~4%)


 --SYNC mode was much slower.


 I modified the producer perf tool to use the scala batched producer api
 (not available in v8.2) --sync mode and perf of --sync mode was closer to
 async mode.


 -roshan



 On 4/24/15 11:42 AM, Navneet Gupta (Tech - BLR)
 navneet.gu...@flipkart.com wrote:

 Hi,
 
 I ran some tests on our cluster by sending message from multiple clients
 (machines). Each machine had about 40-100 threads per producer.
 
 I thought of trying out having multiple producers per clients with each
 producer receiving messages from say 10-15 threads. I actually did see an
 increase in throughput in this case. It was not one off cases but a
 repeatable phenomenon. I called threads to producer ratio sharingFactor in
 my code.
 
 I am not planning to use it this way in our clients sending messages to
 Kafka but it did go against the suggestion to have single producer across
 all threads.
 
 
 
 On Fri, Apr 24, 2015 at 10:27 PM, Manikumar Reddy ku...@nmsworks.co.in
 wrote:
 
  Hi Jay,
 
  Yes, we are producing from single process/jvm.
 
  From docs The producer will attempt to batch records together into
 fewer
  requests whenever multiple records are being sent to the same
 partition.
 
  If I understand correctly, batching happens at topic/partition level,
 not
  at Node level. right?
 
  If yes, then  both (single producer for all topics , separate producer
 for
  each topic) approaches
  may give similar performance.
 
  On Fri, Apr 24, 2015 at 9:29 PM, Jay Kreps jay.kr...@gmail.com wrote:
 
   If you are talking about within a single process, having one producer
 is
   generally the fastest because batching dramatically reduces the
 number of
   requests (esp using the new java producer).
   -Jay
  
   On Fri, Apr 24, 2015 at 4:54 AM, Manikumar Reddy 
   manikumar.re...@gmail.com
   wrote:
  
We have a 2 node cluster with 100 topics.
should we use a single producer for all topics or  create multiple
producers?
What is the best choice w.r.t network load/failures, node failures,
latency, locks?
   
Regards,
Manikumar
   
  
 
 
 
 
 --
 Thanks  Regards,
 Navneet Gupta




Re: [KIP-DISCUSSION] KIP-22 Expose a Partitioner interface in the new producer

2015-04-23 Thread Jay Kreps
Hey Harsha,

A few comments:

Can you finish up the KIP there are some unfinished sentences and odd
whitespace things going on.

Here are the questions I think we should consider:
1. Do we need this at all given that we have the partition argument in
ProducerRecord which gives full control? I think we do need it because this
is a way to plug in a different partitioning strategy at run time and do it
in a fairly transparent way.
2. We certainly need to add both the serialized and unserialized form for
the key as both are useful.
3. Do we need to add the value? I suspect people will have uses for
computing something off a few fields in the value to choose the partition.
This would be useful in cases where the key was being used for log
compaction purposes and did not contain the full information for computing
the partition.
4. This interface doesn't include either an init() or close() method. It
should implement Closable and Configurable, right?
5. What happens if the user both sets the partition id in the
ProducerRecord and sets a partitioner? Does the partition id just get
passed in to the partitioner (as sort of implied in this interface?). This
is a bit weird since if you pass in the partition id you kind of expect it
to get used, right? Or is it the case that if you specify a partition the
partitioner isn't used at all (in which case no point in including
partition in the Partitioner api).

Cheers,

-Jay

On Thu, Apr 23, 2015 at 6:55 AM, Sriharsha Chintalapani ka...@harsha.io
wrote:

 Hi,
 Here is the KIP for adding a partitioner interface for producer.

 https://cwiki.apache.org/confluence/display/KAFKA/KIP-+22+-+Expose+a+Partitioner+interface+in+the+new+producer
 There is one open question about how interface should look like. Please
 take a look and let me know if you prefer one way or the other.

 Thanks,
 Harsha




Re: [KIP-DISCUSSION] KIP-22 Expose a Partitioner interface in the new producer

2015-04-23 Thread Jay Kreps
Hey Sriharsha,

Great, thanks!

For 4:

Yeah the use case for init and close is making use of any kind of metadata.
An example of this would be if you are trying to do range partitioning you
need to map lexicographic ranges to numeric partitions. You might do this
by adding a new property to the config such as
   partitioner.metadata=a:0, b:1, ..., z:26
Or likewise you might have a partitioner built using Java's digest
interface and the config would be the digest algorithm name.

Or you might need to maintain this dynamically and have the partitioner
fetch this list on initialization from some central store (zk or whatever).

The init method we should use is the Configurable interface, that will
automatically pass in the configuration given to the producer at
instantiation time.

I agree that these additional methods add a bit of complexity and often
aren't needed, but on the flip side it is often hard to use the interface
without them when you do need them.

5. Yeah that's how the current partitioner works, but that is just because
it is a non-public interface. It's not clear to me if the partitioner
should override the partition or not. We could either say:
a. The partitioner is the default policy and the partition field is a way
to override that on a per-record basis for cases where you need that or
where it is simpler. If this is our description then the partitioner should
only take effect if partition==null
b. The partition the user specifies is just a suggestion and the
partitioner can interpret or override that in whatever way they want.

I think (a) may actually make more sense. The reason is because otherwise
the behavior of the partition field in ProducerRecord will be very hard to
depend on as the effect it has will be totally dependent on the partitioner
that is set. Any correct partitioner will basically have to implement the
case where the partition is set and I think the only sensible thing then is
to use it as the partition (right?).

Dunno, what do you think...?

-Jay

On Thu, Apr 23, 2015 at 2:59 PM, Sriharsha Chintalapani 
harsh...@fastmail.fm wrote:

 Hi Jay,
  Sorry about the KIP formatting . I fixed those in the KIP.

 2. We certainly need to add both the serialized and unserialized form for
 the key as both are useful.

 I added those to the interface.

 3. Do we need to add the value? I suspect people will have uses for
 computing something off a few fields in the value to choose the partition.
 This would be useful in cases where the key was being used for log
 compaction purposes and did not contain the full information for computing
 the partition.

 added it as well.

 4. This interface doesn't include either an init() or close() method. It
 should implement Closable and Configurable, right?

 I am not quite sure about having init() or close() for partitioner. Are we
 looking at partitioner using some external resources to initialize and
 close. If thats the case than init() should also take in some config as
 param, this can add more complexity.


 5. What happens if the user both sets the partition id in the
 ProducerRecord and sets a partitioner? Does the partition id just get
 passed in to the partitioner (as sort of implied in this interface?). This
 is a bit weird since if you pass in the partition id you kind of expect it
 to get used, right? Or is it the case that if you specify a partition the
 partitioner isn't used at all (in which case no point in including
 partition in the Partitioner api).

 In current Producer Record partition id is getting passed to Partitioner.
 If a custom partitioner is not going to use that than thats up to their
 implementation  right. Similarly in our interface we’ve Value as another
 param this may or may not be used. Essentially its up to the Partitioner to
 disclose on what available information they are going to partition against.

 Thanks,
 Harsha


 On April 23, 2015 at 9:11:33 AM, Jay Kreps (jay.kr...@gmail.com) wrote:

 Hey Harsha,

 A few comments:

 Can you finish up the KIP there are some unfinished sentences and odd
 whitespace things going on.

 Here are the questions I think we should consider:
 1. Do we need this at all given that we have the partition argument in
 ProducerRecord which gives full control? I think we do need it because
 this
 is a way to plug in a different partitioning strategy at run time and do
 it
 in a fairly transparent way.
 2. We certainly need to add both the serialized and unserialized form for
 the key as both are useful.
 3. Do we need to add the value? I suspect people will have uses for
 computing something off a few fields in the value to choose the partition.
 This would be useful in cases where the key was being used for log
 compaction purposes and did not contain the full information for computing
 the partition.
 4. This interface doesn't include either an init() or close() method. It
 should implement Closable and Configurable, right?
 5. What happens if the user both sets

Re: [DISCUSS] KIP-12 - Kafka Sasl/Kerberos implementation

2015-04-21 Thread Jay Kreps
Hey Sriharsha,

Thanks for the excellent write-up.

Couple of minor questions:

1. Isn't the blocking handshake going to be a performance concern? Can we
do the handshake non-blocking instead? If anything that causes connections
to drop can incur blocking network roundtrips won't that eat up all the
network threads immediately? I guess I would have to look at that code to
know...

2. Do we need to support blocking channel at all? That is just for the old
clients, and I think we should probably just leave those be to reduce scope
here.

3. Can we change the APIs to drop the getters when that is not required by
the API being implemented. In general we don't use setters and getters as a
naming convention.

The long explanation on that is that setters/getters kind of imply a style
of java programming where you have simple structs with getters and setters
for each field. In general we try to have access methods only when
necessary, and rather than setters model the full change or action being
carried out, and if possible disallow change entirely. This is more in line
with modern java style I think. We aren't perfect in following this, but
once you start with getters and setters people start just adding them
everywhere and then using them.

-Jay


On Mon, Apr 20, 2015 at 10:42 AM, Sriharsha Chintalapani ka...@harsha.io
wrote:

 Hi,
  I updated the KIP-12 with more details. Please take a look
 https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=51809888

 Thanks,
 Harsha


 On February 11, 2015 at 10:02:43 AM, Harsha (ka...@harsha.io) wrote:

 Thanks Joe. It will be part of KafkaServer and will run on its own
 thread. Since each kafka server will run with a keytab we should make
 sure they are all getting renewed.

 On Wed, Feb 11, 2015, at 10:00 AM, Joe Stein wrote:
  Thanks Harsha, looks good so far. How were you thinking of running
  the KerberosTicketManager as a standalone process or like controller or
  is
  it a layer of code that does the plumbing pieces everywhere?
 
  ~ Joestein
 
  On Wed, Feb 11, 2015 at 12:18 PM, Harsha ka...@harsha.io wrote:
 
   Hi,
   Here is the initial proposal for sasl/kerberos implementation for
   kafka https://cwiki.apache.org/confluence/x/YI4WAw
   and JIRA https://issues.apache.org/jira/browse/KAFKA-1686. I am
   currently working on prototype which will add more details to the KIP.
   Just opening the thread to say the work is in progress. I'll update the
   thread with a initial prototype patch.
   Thanks,
   Harsha
  



Re: Number of Partitions and Performance

2015-04-07 Thread Jay Kreps
I think the blog post was giving that as an upper bound not a recommended
size. I think that blog goes through some of the trade offs of having more
or fewer partitions.

-Jay

On Tue, Apr 7, 2015 at 10:13 AM, François Méthot fmetho...@gmail.com
wrote:

 Hi,

   We initially had configured our topics to have between 8 to 16 partitions
 each on a cluster of 10 brokers (vm with 2 cores, 16 MB ram, Few TB of SAN
 Disk).

 Then I came across the rule of thump formula *100 x b x r.*
 (

 http://blog.confluent.io/2015/03/12/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/
 )

 100 x 10 brokers x 2 Replication = 2000 partitions.

 We gave it try and but our single threaded kafka producer performance
 dropped by 80%.

 What is the benefits of having that much partitions?

 Is there any problem in the long run with using a topic with as few as 16
 partitions?


 Francois



Re: Java NPE in using KafkaConsumer API

2015-03-30 Thread Jay Kreps
Are you using the 0.8.2 release or trunk?

-Jay

On Mon, Mar 30, 2015 at 1:35 AM, Sandeep Bishnoi 
sandeepbishnoi.b...@gmail.com wrote:

 Hi,

  I have configured a kafka consumer as follows:
 Properties props = new Properties();
  // ..
  // Populated properties
 KafkaConsumer consumer = new KafkaConsumer(props);

 // subscribe to partition 0 of topic test
 TopicPartition partition0 = new TopicPartition(test, 0);
 TopicPartition[] partitions = new TopicPartition[1];
 partitions[0] = partition0;
 consumer.subscribe(partitions);
 System.out.println(Created consumer  + consumer);

   Consumer is getting created without any errors.

  Now I am following instructions on how to read from a partition using

 http://kafka.apache.org/083/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html
 .

  Here is a code snippet for the same:

 boolean isRunning = true;
 MapTopicPartition, Long consumedOffsets = new HashMapTopicPartition,
 Long();
 while(isRunning)
 {
   MapString, ConsumerRecordsbyte[], byte[] records =
 consumer.poll(100);
  ...
  .

   The consumer.poll() api is returning null always. Although the remaining
 code is based on the returned value from poll.

   Can you please let me know whether
 org.apache.kafka.clients.consumer.KafkaConsumer is a valid API to use if I
 want to create a client which can read from a given partition of a given
 topic.

 Best Regards,
 Sandeep



Re: Kafka 0.9 consumer API

2015-03-19 Thread Jay Kreps
:-)

On Thursday, March 19, 2015, James Cheng jch...@tivo.com wrote:

 Those are pretty much the best javadocs I've ever seen. :)

 Nice job, Kafka team.

 -James

  On Mar 19, 2015, at 9:40 PM, Jay Kreps jay.kr...@gmail.com
 javascript:; wrote:
 
  Err, here:
 
 http://kafka.apache.org/083/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
 
  -Jay
 
  On Thu, Mar 19, 2015 at 9:40 PM, Jay Kreps jay.kr...@gmail.com
 javascript:; wrote:
 
  The current work in progress is documented here:
 
 
  On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian ra...@signalfuse.com
 javascript:;
  wrote:
 
  Is there a link to the proposed new consumer non-blocking API?
 
  Thanks,
  Rajiv
 
 
 




Re: Kafka 0.9 consumer API

2015-03-19 Thread Jay Kreps
The current work in progress is documented here:


On Thu, Mar 19, 2015 at 7:18 PM, Rajiv Kurian ra...@signalfuse.com wrote:

 Is there a link to the proposed new consumer non-blocking API?

 Thanks,
 Rajiv



Re: Database Replication Question

2015-03-12 Thread Jay Kreps
Xiao,

Not sure about AIX or HP-UX. There are some people running on Windows,
though we don't do real systemic testing against that. I would be surprised
if z/os worked, someone would have to try.

The existing fsync policy already works at the batch level, and Kafka
already does batching quite aggressively. If you set the fsync policy to
force fsync and you get an acknowledgement that the fsync occurred.

In any case, it has not been my experience that datacenter power outages
are a common failure mode whereas disk failures happen continually.

I agree that the prototype transaction support would not be a good fit for
trying to commit each message in a transaction.

-Jay



On Mon, Mar 9, 2015 at 10:56 PM, Xiao lixiao1...@gmail.com wrote:

 Hi, Jay,

 Thank you!

 The Kafka document shows “Kafka should run well on any unix system. I
 assume it includes the major two Unix versions, IBM AIX and HP-UX. Right?

 1. Unfortunately, we aims at supporting all the platforms, Linux, Unix,
 Windows and especially z/OS. I know z/OS is not easy to support.

 2. Fsync per message is very expensive and Fsync per batch will break the
 transaction atomicity. We are looking for transaction-level fsync, which is
 more efficient. Then, our producers can easily combine multiple small
 transactions into a single bigger batch transaction. I hope the
 implementation of the ongoing Kafka feature “Transactional Messaging in
 Kafka” already considered all these issues, although the following link
 does not mention it:
 https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka

 3. After a quick reading of the design of “Transactional Messaging in
 Kafka”, I have a doubt if it can scale especially when most transactions
 are very short (e.g., containing a single message).
 - In many use cases, we do not need global transactions, which
 might be too expensive. The partition-specific transaction granularity
 might be fine.
 - To reduce the overhead of transaction-level fsync + network or
 address-space roundtrips, Kafka might need two extra parameters for
 batching the small transactions into a single one. The performance benefits
 of batching are huge, as shown in the MRI (multi-row insert) feature in
 Oracle and DB2 z/OS.

 I believe transactional messaging is a critical feature. The design
 document is not very clear. Do you have more materials or links about it?

 Thanks,

 Xiao Li


 On Mar 7, 2015, at 9:33 AM, Jay Kreps jay.kr...@gmail.com wrote:

  Xiao,
 
  FileChannel.force is fsync on unix.
 
  To force fsync on every message:
  log.flush.interval.messages=1
 
  You are looking at the time based fsync, which, naturally, as you say, is
  time-based.
 
  -Jay
 
  On Fri, Mar 6, 2015 at 11:35 PM, Xiao lixiao1...@gmail.com wrote:
 
  Hi, Jay,
 
  Thank you for your answer.
 
  Sorry, I still do not understand your meaning.
 
  I guess the two parameters you mentioned are log.flush.interval and
  log.default.flush.interval.ms. However, these two parameters only
 control
  when Kafka issues a flush (i.e., calling FileChannel.force()).
 
  Fsync (fileOutputStream.getFD().sync()) is controlled by another
 parameter
  log.default.flush.scheduler.interval.ms.
 
   scheduler.schedule(kafka-recovery-point-checkpoint,
  checkpointRecoveryPointOffsets,
  delay = InitialTaskDelayMs,
  period = flushCheckpointMs,
  TimeUnit.MILLISECONDS)
 
  This thread is only time-controlled. It does not check the number of
  messages.
 
  Thank you,
 
  Xiao Li
 
 
  On Mar 5, 2015, at 11:59 AM, Jay Kreps jay.kr...@gmail.com wrote:
 
  Hey Xiao,
 
  That's not quite right. Fsync is controlled by either a time based
  criteria
  (flush every 30 seconds) or a number of messages criteria. So if you
 set
  the number of messages to 1 the flush is synchronous with the write,
  which
  I think is what you are looking for.
 
  -Jay
 
 




Re: Got java.util.IllegalFormatConversionException when running MirrorMaker off trunk code

2015-03-08 Thread Jay Kreps
Hey guys,

If we checked in obviously broken code on trunk, let's fix it now or revert
that change.

-Jay

On Sat, Mar 7, 2015 at 12:48 AM, Jiangjie Qin j...@linkedin.com.invalid
wrote:

 Hi Tao,

 Thanks a lot for finding the bug. We are actually rewriting the mirror
 maker in KAFKA-1997 with a much simplified solution using the newly added
 flush() call in new java producer.
 Mirror maker in current trunk is also missing one necessary
 synchronization - the UncheckedOffsets.removeOffset is not synchronized. I
 am hesitating whether to fix those problems in current trunk or just
 waiting for Kafka-1997 to be checked in. If you have a strong opinion
 about this, we can probably fix those 2 issues in the trunk. It should be
 a small patch but I just don¹t want to people get distracted.

 Jiangjie (Becket) Qin

 On 3/6/15, 10:15 PM, tao xiao xiaotao...@gmail.com wrote:

 I think I worked out the root cause
 
 Line 593 in MirrorMaker.scala
 
 trace(Updating offset for %s to %d.format(topicPartition, offset))
 should
 be
 
 trace(Updating offset for %s to %d.format(topicPartition,
 offset.element))
 
 
 On Sat, Mar 7, 2015 at 2:12 AM, tao xiao xiaotao...@gmail.com wrote:
 
  A bit more context: I turned on async in producer.properties
 
  On Sat, Mar 7, 2015 at 2:09 AM, tao xiao xiaotao...@gmail.com wrote:
 
  Hi team,
 
  I am having java.util.IllegalFormatConversionException when running
  MirrorMaker with log level set to trace. The code is off latest trunk
 with
  commit 8f0003f9b694b4da5fbd2f86db872d77a43eb63f
 
  The way I bring up is
 
  bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config
  ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/consumer.properties
  --producer.config
  ~/Downloads/kafka/kafka_2.10-0.8.2.0/config/producer.properties
  --num.streams 1 --num.producers 1 --no.data.loss --whitelist
  mm-benchmark-test\\w* --offset.commit.interval.ms 1
  --queue.byte.size 1024
  and set the log level to trace in tools-log4j.properties
 
  here is the log snippet
 
  [2015-03-07 02:04:27,211] TRACE [mirrormaker-producer-0] Sending
 message
  with value size 13 (kafka.tools.MirrorMaker$ProducerThread)
 
  [2015-03-07 02:04:27,211] TRACE Sending record
  ProducerRecord(topic=mm-benchmark-test, partition=null,
 key=[B@130362d0,
  value=[B@434c4f70 with callback
  kafka.tools.MirrorMaker$MirrorMakerProducerCallback@46f36494 to topic
  mm-benchmark-test partition 0
  (org.apache.kafka.clients.producer.KafkaProducer)
 
  [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending
 message
  with value size 13 (kafka.tools.MirrorMaker$ProducerThread)
 
  [2015-03-07 02:04:27,212] TRACE Sending record
  ProducerRecord(topic=mm-benchmark-test, partition=null,
 key=[B@54957b67,
  value=[B@21d8d293 with callback
  kafka.tools.MirrorMaker$MirrorMakerProducerCallback@21e8c241 to topic
  mm-benchmark-test partition 0
  (org.apache.kafka.clients.producer.KafkaProducer)
 
  [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending
 message
  with value size 13 (kafka.tools.MirrorMaker$ProducerThread)
 
  [2015-03-07 02:04:27,212] TRACE Sending record
  ProducerRecord(topic=mm-benchmark-test, partition=null,
 key=[B@1eed723b,
  value=[B@1acd590b with callback
  kafka.tools.MirrorMaker$MirrorMakerProducerCallback@1f90eeec to topic
  mm-benchmark-test partition 0
  (org.apache.kafka.clients.producer.KafkaProducer)
 
  [2015-03-07 02:04:27,212] TRACE [mirrormaker-producer-0] Sending
 message
  with value size 13 (kafka.tools.MirrorMaker$ProducerThread)
 
  [2015-03-07 02:04:27,212] TRACE Sending record
  ProducerRecord(topic=mm-benchmark-test, partition=null,
 key=[B@3ae8a936,
  value=[B@bd3671 with callback
  kafka.tools.MirrorMaker$MirrorMakerProducerCallback@6413518 to topic
  mm-benchmark-test partition 0
  (org.apache.kafka.clients.producer.KafkaProducer)
 
  [2015-03-07 02:04:27,212] ERROR Error executing user-provided callback
 on
  message for topic-partition mm-benchmark-test-0:
  (org.apache.kafka.clients.producer.internals.RecordBatch)
 
  java.util.IllegalFormatConversionException: d !=
  kafka.tools.MirrorMaker$UnackedOffset
 
  at
 java.util.Formatter$FormatSpecifier.failConversion(Formatter.java:4045)
 
  at
 java.util.Formatter$FormatSpecifier.printInteger(Formatter.java:2748)
 
  at java.util.Formatter$FormatSpecifier.print(Formatter.java:2702)
 
  at java.util.Formatter.format(Formatter.java:2488)
 
  at java.util.Formatter.format(Formatter.java:2423)
 
  at java.lang.String.format(String.java:2790)
 
  at
 
 scala.collection.immutable.StringLike$class.format(StringLike.scala:266)
 
  at scala.collection.immutable.StringOps.format(StringOps.scala:31)
 
  at
 
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletio
 n$2.apply(MirrorMaker.scala:592)
 
  at
 
 kafka.tools.MirrorMaker$MirrorMakerProducerCallback$$anonfun$onCompletio
 n$2.apply(MirrorMaker.scala:592)
 
  at kafka.utils.Logging$class.trace(Logging.scala:36)
 
  at 

Re: Database Replication Question

2015-03-07 Thread Jay Kreps
Xiao,

FileChannel.force is fsync on unix.

To force fsync on every message:
log.flush.interval.messages=1

You are looking at the time based fsync, which, naturally, as you say, is
time-based.

-Jay

On Fri, Mar 6, 2015 at 11:35 PM, Xiao lixiao1...@gmail.com wrote:

 Hi, Jay,

 Thank you for your answer.

 Sorry, I still do not understand your meaning.

 I guess the two parameters you mentioned are log.flush.interval and
 log.default.flush.interval.ms. However, these two parameters only control
 when Kafka issues a flush (i.e., calling FileChannel.force()).

 Fsync (fileOutputStream.getFD().sync()) is controlled by another parameter
 log.default.flush.scheduler.interval.ms.

   scheduler.schedule(kafka-recovery-point-checkpoint,
  checkpointRecoveryPointOffsets,
  delay = InitialTaskDelayMs,
  period = flushCheckpointMs,
  TimeUnit.MILLISECONDS)

 This thread is only time-controlled. It does not check the number of
 messages.

 Thank you,

 Xiao Li


 On Mar 5, 2015, at 11:59 AM, Jay Kreps jay.kr...@gmail.com wrote:

  Hey Xiao,
 
  That's not quite right. Fsync is controlled by either a time based
 criteria
  (flush every 30 seconds) or a number of messages criteria. So if you set
  the number of messages to 1 the flush is synchronous with the write,
 which
  I think is what you are looking for.
 
  -Jay




Re: JMS to Kafka: Inbuilt JMSAdaptor/JMSProxy/JMSBridge (Client can speak JMS but hit Kafka)

2015-03-06 Thread Jay Kreps
I think this is great. I assume the form this would take would be a library
that implements the JMS api that wraps the existing java producer and
consumer?

Our past experience has been that trying to maintain all this stuff
centrally is too hard and tends to stifle rather than support innovation.
So if you are interested in doing this I would recommend doing a small
github project. We will definitely help promote it. Several people have
asked for it so I suspect you would definitely get some usage. I would also
love to hear how well that adaption works in practice--i.e. what percentage
of JMS features are supportable by Kafka.

-Jay

On Thu, Mar 5, 2015 at 6:30 PM, Joshi, Rekha rekha_jo...@intuit.com wrote:

 Hi,

 Kafka is a great alternative to JMS, providing high performance,
 throughput as scalable, distributed pub sub/commit log service.

 However there always exist traditional systems running on JMS.
 Rather than rewriting, it would be great if we just had an inbuilt
 JMSAdaptor/JMSProxy/JMSBridge by which client can speak JMS but hit Kafka
 behind-the-scene.
 Something like Chukwa's
 o.a.h.chukwa.datacollection.adaptor.jms.JMSAdaptor, which receives msg off
 JMS queue and transforms to a Chukwa chunk?

 I have come across folks talking of this need in past as well.Is it
 considered and/or part of the roadmap?
 http://grokbase.com/t/kafka/users/131cst8xpv/stomp-binding-for-kafka

 http://grokbase.com/t/kafka/users/148dm4247q/consuming-messages-from-kafka-and-pushing-on-to-a-jms-queue

 http://grokbase.com/t/kafka/users/143hjepbn2/request-kafka-zookeeper-jms-details

 Looking for inputs on correct way to approach this so to retain all good
 features of Kafka while still not rewriting entire application.Possible?

 Thanks
 Rekha



Re: Database Replication Question

2015-03-05 Thread Jay Kreps
Hey Xiao,

That's not quite right. Fsync is controlled by either a time based criteria
(flush every 30 seconds) or a number of messages criteria. So if you set
the number of messages to 1 the flush is synchronous with the write, which
I think is what you are looking for.

-Jay

On Thu, Mar 5, 2015 at 1:17 AM, Xiao lixiao1...@gmail.com wrote:

 Hey, Jay,

 Thank you for your answer!

 Based on my understanding, Kafka fsync is regularly issued by a dedicated
 helper thread.  It is not issued based on the semantics. The producers are
 unable to issue a COMMIT to trigger fsync.

 Not sure if this requirement is highly desirable to the others too?

 Night,

 Xiao Li

 On Mar 4, 2015, at 9:00 AM, Jay Kreps jay.kr...@gmail.com wrote:

  Hey Xiao,
 
  Yeah I agree that without fsync you will not get durability in the case
 of
  a power outage or other correlated failure, and likewise without
  replication you won't get durability in the case of disk failure.
 
  If each batch is fsync'd it will definitely be slower, depending on the
  capability of the disk subsystem. Either way that feature is there now.
 
  -Jay
 
  On Wed, Mar 4, 2015 at 8:50 AM, Xiao lixiao1...@gmail.com wrote:
 
  Hey Jay,
 
  Yeah. I understood the advantage of Kafka is one to many. That is why I
 am
  reading the source codes of Kafka. Your guys did a good product! : )
 
  Our major concern is its message persistency. Zero data loss is a must
 in
  our applications. Below is what I copied from the Kafka document.
 
  The log takes two configuration parameter M which gives the number of
  messages to write before forcing the OS to flush the file to disk, and S
  which gives a number of seconds after which a flush is forced. This
 gives a
  durability guarantee of losing at most M messages or S seconds of data
 in
  the event of a system crash.
 
  Basically, our producers needs to know if the data have been
  flushed/fsynced to the disk. Our model is disconnected. Producers and
  consumers do not talk with each other. The only media is a Kafka-like
  persistence message queue.
 
  Unplanned power outage is not rare in 24/7 usage. Any data loss could
  cause a very expensive full refresh. That is not acceptable for many
  financial companies.
 
  If we do fsync for each transaction or each batch, the throughput could
 be
  low? Or another way is to let our producers check recovery points very
  frequently, and then the performance bottleneck will be on
 reading/copying
  the recovery-point file. Any other ideas?
 
  I have not read the source codes for synchronous disk replication. That
  will be my next focus. I am not sure if that can resolve our above
 concern.
 
  BTW, do you have any plan to support mainframe?
 
  Thanks,
 
  Xiao Li
 
 
  On Mar 4, 2015, at 8:01 AM, Jay Kreps jay.kr...@gmail.com wrote:
 
  Hey Xiao,
 
  1. Nothing prevents applying transactions transactionally on the
  destination side, though that is obviously more work. But I think the
 key
  point here is that much of the time the replication is not
  Oracle=Oracle,
  but Oracle={W, X, Y, Z} where W/X/Y/Z are totally heterogenous systems
  that aren't necessarily RDBMSs.
 
  2. I don't think fsync is really relevant. You can fsync on every
 message
  if you like, but Kafka's durability guarantees don't depend on this as
 it
  allows synchronous commit across replicas. This changes the guarantee
  from
  won't be lost unless the disk dies to won't be lost unless all
  replicas
  die but the later is generally a stronger guarantee in practice given
  the
  empirical reliability of disks (#1 reason for server failure in my
  experience was disk failure).
 
  -Jay
 
  On Tue, Mar 3, 2015 at 4:23 PM, Xiao lixiao1...@gmail.com wrote:
 
  Hey Josh,
 
  If you put different tables into different partitions or topics, it
  might
  break transaction ACID at the target side. This is risky for some use
  cases. Besides unit of work issues, you also need to think about the
  load
  balancing too.
 
  For failover, you have to find the timestamp for point-in-time
  consistency. This part is tricky. You have to ensure all the changes
  before
  a specific timestamp have been flushed to the disk. Normally, you can
  maintain a bookmark for different partition at the target side to know
  what
  is the oldest transactions have been flushed to the disk.
 Unfortunately,
  based on my understanding, Kafka is unable to do it because it does
 not
  do
  fsync regularly for achieving better throughput.
 
  Best wishes,
 
  Xiao Li
 
 
  On Mar 3, 2015, at 3:45 PM, Xiao lixiao1...@gmail.com wrote:
 
  Hey Josh,
 
  Transactions can be applied in parallel in the consumer side based on
  transaction dependency checking.
 
  http://www.google.com.ar/patents/US20080163222
 
  This patent documents how it work. It is easy to understand, however,
  you also need to consider the hash collision issues. This has been
  implemented in IBM Q Replication since 2001.
 
  Thanks,
 
  Xiao Li

Re: Database Replication Question

2015-03-04 Thread Jay Kreps
Hey Josh,

NoSQL DBs may actually be easier because they themselves generally don't
have a global order. I.e. I believe Mongo has a per-partition oplog, is
that right? Their partitions would match our partitions.

-Jay

On Wed, Mar 4, 2015 at 5:18 AM, Josh Rader jrader...@gmail.com wrote:

 Thanks everyone for your responses!  These are great.  It seems our cases
 matches closest to Jay's recommendations.

 The one part that sounds a little tricky is point #5 'Include in each
 message the database's transaction id, scn, or other identifier '.  This is
 pretty straightforward with the RDBMS case that I mentioned, but I could
 see wanting to extend this to replicate NoSQL stores (Cassandra, Mongo)
 which might not always have a readily available monotonic id, particularly
 in failover scenarios.  I guess in that case we can think about creating
 this id ourselves from the single producer.

 Xiao,

 I think in the Kafka failover cases you mention if we also store the offset
 with replicated data we should be able to pick up where we left off since
 we are using the low level consumer.  Maybe I am missing your point
 though...

 Guozhang,

 Very good point that we didn't think of.  We will need to think this
 through, as you say avoid resending other messages in a batch if one is
 failed.  I wonder if we might also manage this on the consumer side too
 with idempotency.  Thanks for raising this!

 Josh



 On Tue, Mar 3, 2015 at 6:08 PM, Xiao lixiao1...@gmail.com wrote:

  Hey Josh,
 
  Sorry, after reading codes, Kafka did fsync the data using a separate
  thread. The recovery point (oldest transaction timestamp) can be got from
  the file recovery-point-offset-checkpoint.
 
  You can adjust the value config.logFlushOffsetCheckpointIntervalMs, if
 you
  think the speed is not quick enough. When the workloads is huge, the
  bottleneck could be in your target side or source side. That means, your
  apply could have enough jobs to do.
 
  Basically, you need to keep reading this file for determining the oldest
  timestamps of all relevant partitions. Then, apply the transactions until
  that timestamp.
 
  Note, this does not protect the transaction consistency. This is just for
  ensuring the data at the target side is consistent at one timestamp when
  you have multiple channel to send data changes. The implementation should
  be simple if you can understand the concepts. I am unable to find the
 filed
  patent application about it. This is one related paper. It covers the
 main
  concepts about the issues you are facing. Inter-Data-Center Large-Scale
  Database Replication Optimization – A Workload Driven Partitioning
 Approach
 
  Hopefully, you understood what I explained above.
 
  Best wishes,
 
  Xiao Li
 
  Best wishes,
 
  Xiao Li
 
  On Mar 3, 2015, at 4:23 PM, Xiao lixiao1...@gmail.com wrote:
 
   Hey Josh,
  
   If you put different tables into different partitions or topics, it
  might break transaction ACID at the target side. This is risky for some
 use
  cases. Besides unit of work issues, you also need to think about the load
  balancing too.
  
   For failover, you have to find the timestamp for point-in-time
  consistency. This part is tricky. You have to ensure all the changes
 before
  a specific timestamp have been flushed to the disk. Normally, you can
  maintain a bookmark for different partition at the target side to know
 what
  is the oldest transactions have been flushed to the disk. Unfortunately,
  based on my understanding, Kafka is unable to do it because it does not
 do
  fsync regularly for achieving better throughput.
  
   Best wishes,
  
   Xiao Li
  
  
   On Mar 3, 2015, at 3:45 PM, Xiao lixiao1...@gmail.com wrote:
  
   Hey Josh,
  
   Transactions can be applied in parallel in the consumer side based on
  transaction dependency checking.
  
   http://www.google.com.ar/patents/US20080163222
  
   This patent documents how it work. It is easy to understand, however,
  you also need to consider the hash collision issues. This has been
  implemented in IBM Q Replication since 2001.
  
   Thanks,
  
   Xiao Li
  
  
   On Mar 3, 2015, at 3:36 PM, Jay Kreps jay.kr...@gmail.com wrote:
  
   Hey Josh,
  
   As you say, ordering is per partition. Technically it is generally
  possible
   to publish all changes to a database to a single partition--generally
  the
   kafka partition should be high throughput enough to keep up. However
  there
   are a couple of downsides to this:
   1. Consumer parallelism is limited to one. If you want a total order
  to the
   consumption of messages you need to have just 1 process, but often
 you
   would want to parallelize.
   2. Often what people want is not a full stream of all changes in all
  tables
   in a database but rather the changes to a particular table.
  
   To some extent the best way to do this depends on what you will do
  with the
   data. However if you intend to have lots
  
   I have seen pretty much every variation

Re: Database Replication Question

2015-03-04 Thread Jay Kreps
Hey Xiao,

1. Nothing prevents applying transactions transactionally on the
destination side, though that is obviously more work. But I think the key
point here is that much of the time the replication is not Oracle=Oracle,
but Oracle={W, X, Y, Z} where W/X/Y/Z are totally heterogenous systems
that aren't necessarily RDBMSs.

2. I don't think fsync is really relevant. You can fsync on every message
if you like, but Kafka's durability guarantees don't depend on this as it
allows synchronous commit across replicas. This changes the guarantee from
won't be lost unless the disk dies to won't be lost unless all replicas
die but the later is generally a stronger guarantee in practice given the
empirical reliability of disks (#1 reason for server failure in my
experience was disk failure).

-Jay

On Tue, Mar 3, 2015 at 4:23 PM, Xiao lixiao1...@gmail.com wrote:

 Hey Josh,

 If you put different tables into different partitions or topics, it might
 break transaction ACID at the target side. This is risky for some use
 cases. Besides unit of work issues, you also need to think about the load
 balancing too.

 For failover, you have to find the timestamp for point-in-time
 consistency. This part is tricky. You have to ensure all the changes before
 a specific timestamp have been flushed to the disk. Normally, you can
 maintain a bookmark for different partition at the target side to know what
 is the oldest transactions have been flushed to the disk. Unfortunately,
 based on my understanding, Kafka is unable to do it because it does not do
 fsync regularly for achieving better throughput.

 Best wishes,

 Xiao Li


 On Mar 3, 2015, at 3:45 PM, Xiao lixiao1...@gmail.com wrote:

  Hey Josh,
 
  Transactions can be applied in parallel in the consumer side based on
 transaction dependency checking.
 
  http://www.google.com.ar/patents/US20080163222
 
  This patent documents how it work. It is easy to understand, however,
 you also need to consider the hash collision issues. This has been
 implemented in IBM Q Replication since 2001.
 
  Thanks,
 
  Xiao Li
 
 
  On Mar 3, 2015, at 3:36 PM, Jay Kreps jay.kr...@gmail.com wrote:
 
  Hey Josh,
 
  As you say, ordering is per partition. Technically it is generally
 possible
  to publish all changes to a database to a single partition--generally
 the
  kafka partition should be high throughput enough to keep up. However
 there
  are a couple of downsides to this:
  1. Consumer parallelism is limited to one. If you want a total order to
 the
  consumption of messages you need to have just 1 process, but often you
  would want to parallelize.
  2. Often what people want is not a full stream of all changes in all
 tables
  in a database but rather the changes to a particular table.
 
  To some extent the best way to do this depends on what you will do with
 the
  data. However if you intend to have lots
 
  I have seen pretty much every variation on this in the wild, and here is
  what I would recommend:
  1. Have a single publisher process that publishes events into Kafka
  2. If possible use the database log to get these changes (e.g. mysql
  binlog, Oracle xstreams, golden gate, etc). This will be more complete
 and
  more efficient than polling for changes, though that can work too.
  3. Publish each table to its own topic.
  4. Partition each topic by the primary key of the table.
  5. Include in each message the database's transaction id, scn, or other
  identifier that gives the total order within the record stream. Since
 there
  is a single publisher this id will be monotonic within each partition.
 
  This seems to be the best set of tradeoffs for most use cases:
  - You can have parallel consumers up to the number of partitions you
 chose
  that still get messages in order per ID'd entity.
  - You can subscribe to just one table if you like, or to multiple
 tables.
  - Consumers who need a total order over all updates can do a merge
 across
  the partitions to reassemble the fully ordered set of changes across all
  tables/partitions.
 
  One thing to note is that the requirement of having a single consumer
  process/thread to get the total order isn't really so much a Kafka
  restriction as it just is a restriction about the world, since if you
 had
  multiple threads even if you delivered messages to them in order their
  processing might happen out of order (just do to the random timing of
 the
  processing).
 
  -Jay
 
 
 
  On Tue, Mar 3, 2015 at 3:15 PM, Josh Rader jrader...@gmail.com wrote:
 
  Hi Kafka Experts,
 
 
 
  We have a use case around RDBMS replication where we are investigating
  Kafka.  In this case ordering is very important.  Our understanding is
  ordering is only preserved within a single partition.  This makes
 sense as
  a single thread will consume these messages, but our question is can we
  somehow parallelize this for better performance?   Is there maybe some
  partition key strategy trick to have your cake and eat it too in terms

Re: Database Replication Question

2015-03-04 Thread Jay Kreps
Hey Xiao,

Yeah I agree that without fsync you will not get durability in the case of
a power outage or other correlated failure, and likewise without
replication you won't get durability in the case of disk failure.

If each batch is fsync'd it will definitely be slower, depending on the
capability of the disk subsystem. Either way that feature is there now.

-Jay

On Wed, Mar 4, 2015 at 8:50 AM, Xiao lixiao1...@gmail.com wrote:

 Hey Jay,

 Yeah. I understood the advantage of Kafka is one to many. That is why I am
 reading the source codes of Kafka. Your guys did a good product! : )

 Our major concern is its message persistency. Zero data loss is a must in
 our applications. Below is what I copied from the Kafka document.

 The log takes two configuration parameter M which gives the number of
 messages to write before forcing the OS to flush the file to disk, and S
 which gives a number of seconds after which a flush is forced. This gives a
 durability guarantee of losing at most M messages or S seconds of data in
 the event of a system crash.

 Basically, our producers needs to know if the data have been
 flushed/fsynced to the disk. Our model is disconnected. Producers and
 consumers do not talk with each other. The only media is a Kafka-like
 persistence message queue.

 Unplanned power outage is not rare in 24/7 usage. Any data loss could
 cause a very expensive full refresh. That is not acceptable for many
 financial companies.

 If we do fsync for each transaction or each batch, the throughput could be
 low? Or another way is to let our producers check recovery points very
 frequently, and then the performance bottleneck will be on reading/copying
 the recovery-point file. Any other ideas?

 I have not read the source codes for synchronous disk replication. That
 will be my next focus. I am not sure if that can resolve our above concern.

 BTW, do you have any plan to support mainframe?

 Thanks,

 Xiao Li


 On Mar 4, 2015, at 8:01 AM, Jay Kreps jay.kr...@gmail.com wrote:

  Hey Xiao,
 
  1. Nothing prevents applying transactions transactionally on the
  destination side, though that is obviously more work. But I think the key
  point here is that much of the time the replication is not
 Oracle=Oracle,
  but Oracle={W, X, Y, Z} where W/X/Y/Z are totally heterogenous systems
  that aren't necessarily RDBMSs.
 
  2. I don't think fsync is really relevant. You can fsync on every message
  if you like, but Kafka's durability guarantees don't depend on this as it
  allows synchronous commit across replicas. This changes the guarantee
 from
  won't be lost unless the disk dies to won't be lost unless all
 replicas
  die but the later is generally a stronger guarantee in practice given
 the
  empirical reliability of disks (#1 reason for server failure in my
  experience was disk failure).
 
  -Jay
 
  On Tue, Mar 3, 2015 at 4:23 PM, Xiao lixiao1...@gmail.com wrote:
 
  Hey Josh,
 
  If you put different tables into different partitions or topics, it
 might
  break transaction ACID at the target side. This is risky for some use
  cases. Besides unit of work issues, you also need to think about the
 load
  balancing too.
 
  For failover, you have to find the timestamp for point-in-time
  consistency. This part is tricky. You have to ensure all the changes
 before
  a specific timestamp have been flushed to the disk. Normally, you can
  maintain a bookmark for different partition at the target side to know
 what
  is the oldest transactions have been flushed to the disk. Unfortunately,
  based on my understanding, Kafka is unable to do it because it does not
 do
  fsync regularly for achieving better throughput.
 
  Best wishes,
 
  Xiao Li
 
 
  On Mar 3, 2015, at 3:45 PM, Xiao lixiao1...@gmail.com wrote:
 
  Hey Josh,
 
  Transactions can be applied in parallel in the consumer side based on
  transaction dependency checking.
 
  http://www.google.com.ar/patents/US20080163222
 
  This patent documents how it work. It is easy to understand, however,
  you also need to consider the hash collision issues. This has been
  implemented in IBM Q Replication since 2001.
 
  Thanks,
 
  Xiao Li
 
 
  On Mar 3, 2015, at 3:36 PM, Jay Kreps jay.kr...@gmail.com wrote:
 
  Hey Josh,
 
  As you say, ordering is per partition. Technically it is generally
  possible
  to publish all changes to a database to a single partition--generally
  the
  kafka partition should be high throughput enough to keep up. However
  there
  are a couple of downsides to this:
  1. Consumer parallelism is limited to one. If you want a total order
 to
  the
  consumption of messages you need to have just 1 process, but often you
  would want to parallelize.
  2. Often what people want is not a full stream of all changes in all
  tables
  in a database but rather the changes to a particular table.
 
  To some extent the best way to do this depends on what you will do
 with
  the
  data. However if you intend to have lots
 
  I have seen

Re: publisher spooling ....!

2015-03-03 Thread Jay Kreps
Broker replication is available now and fully documented in the docs. This
approach to availability has a lot of advantages discussed in that ticket
and the one below. Personally, having tried both approaches, I think this
is what most people should do (running a small highly available cluster
well is easier than managing 10x as many persistent stores spread across
all your application nodes).

There is, however, a prototype patch to add limited persistence to the
producer. You can see it here:
https://issues.apache.org/jira/browse/KAFKA-1955

-Jay

On Tue, Mar 3, 2015 at 8:15 AM, sunil kalva kalva.ka...@gmail.com wrote:

 Hi
 Is there any way to spool messages to disk at publisher side when kafka
 cluster is down or not reachable for publisher. If kafka doesn't support
 this feature, what is the best practise to handle this failure scenario.

 I was referring one of the old jira link which is still open state :
 https://issues.apache.org/jira/browse/KAFKA-156, Is there any plan to
 support this in future or any alternatives.

 What is broker replication some one mentioned in the same jira, is there
 any documentation for that ?

 please advise me on this, my entire design depends on this feature.

 t
 Sunil Kalva



Re: Database Replication Question

2015-03-03 Thread Jay Kreps
Hey Josh,

As you say, ordering is per partition. Technically it is generally possible
to publish all changes to a database to a single partition--generally the
kafka partition should be high throughput enough to keep up. However there
are a couple of downsides to this:
1. Consumer parallelism is limited to one. If you want a total order to the
consumption of messages you need to have just 1 process, but often you
would want to parallelize.
2. Often what people want is not a full stream of all changes in all tables
in a database but rather the changes to a particular table.

To some extent the best way to do this depends on what you will do with the
data. However if you intend to have lots

I have seen pretty much every variation on this in the wild, and here is
what I would recommend:
1. Have a single publisher process that publishes events into Kafka
2. If possible use the database log to get these changes (e.g. mysql
binlog, Oracle xstreams, golden gate, etc). This will be more complete and
more efficient than polling for changes, though that can work too.
3. Publish each table to its own topic.
4. Partition each topic by the primary key of the table.
5. Include in each message the database's transaction id, scn, or other
identifier that gives the total order within the record stream. Since there
is a single publisher this id will be monotonic within each partition.

This seems to be the best set of tradeoffs for most use cases:
- You can have parallel consumers up to the number of partitions you chose
that still get messages in order per ID'd entity.
- You can subscribe to just one table if you like, or to multiple tables.
- Consumers who need a total order over all updates can do a merge across
the partitions to reassemble the fully ordered set of changes across all
tables/partitions.

One thing to note is that the requirement of having a single consumer
process/thread to get the total order isn't really so much a Kafka
restriction as it just is a restriction about the world, since if you had
multiple threads even if you delivered messages to them in order their
processing might happen out of order (just do to the random timing of the
processing).

-Jay



On Tue, Mar 3, 2015 at 3:15 PM, Josh Rader jrader...@gmail.com wrote:

 Hi Kafka Experts,



 We have a use case around RDBMS replication where we are investigating
 Kafka.  In this case ordering is very important.  Our understanding is
 ordering is only preserved within a single partition.  This makes sense as
 a single thread will consume these messages, but our question is can we
 somehow parallelize this for better performance?   Is there maybe some
 partition key strategy trick to have your cake and eat it too in terms of
 keeping ordering, but also able to parallelize the processing?



 I am sorry if this has already been asked, but we tried to search through
 the archives and couldn’t find this response.



 Thanks,

 Josh



Re: kafka producer does not distribute messages to partitions evenly?

2015-03-02 Thread Jay Kreps
FWIW, this intensely confusing behavior is fixed in the new producer which
should give the expected result by default.

-Jay

On Mon, Mar 2, 2015 at 6:36 PM, Yang tedd...@gmail.com wrote:

 Thanks. This is indeed the reason.
 On Mar 2, 2015 4:38 PM, Christian Csar christ...@csar.us wrote:

  I believe you are seeing the behavior where the random partitioner is
  sticky.
 
 
 http://mail-archives.apache.org/mod_mbox/kafka-users/201309.mbox/%3ccahwhrrxax5ynimqnacsk7jcggnhjc340y4qbqoqcismm43u...@mail.gmail.com%3E
  has details. So with the default 10 minute refresh if your test is only
 an
  hour or two with a single producer you would not expect to see all
  partitions be hit.
 
  Christian
 
  On Mon, Mar 2, 2015 at 4:23 PM, Yang tedd...@gmail.com wrote:
 
   thanks. just checked code below. in the code below, the line that calls
   Random.nextInt() seems to be called only *a few times* , and all the
 rest
   of the cases getPartition() is called, the
   cached sendPartitionPerTopicCache.get(topic) seems to be called, so
   apparently you won't get an even partition distribution ?
  
   the code I got is from commit 7847e9c703f3a0b70519666cdb8a6e4c8e37c3a7
  
  
   ./core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
  336
   lines --66%--
   222,4673%
  
  
 private def getPartition(topic: String, key: Any, topicPartitionList:
   Seq[PartitionAndLeader]): Int = {
   val numPartitions = topicPartitionList.size
   if(numPartitions = 0)
 throw new UnknownTopicOrPartitionException(Topic  + topic + 
   doesn't exist)
   val partition =
 if(key == null) {
   // If the key is null, we don't really need a partitioner
   // So we look up in the send partition cache for the topic to
   decide the target partition
   val id = sendPartitionPerTopicCache.get(topic)
   id match {
 case Some(partitionId) =
   // directly return the partitionId without checking
   availability of the leader,
   // since we want to postpone the failure until the send
   operation anyways
   partitionId
 case None =
   val availablePartitions =
   topicPartitionList.filter(_.leaderBrokerIdOpt.isDefined)
   if (availablePartitions.isEmpty)
 throw new LeaderNotAvailableException(No leader for any
   partition in topic  + topic)
   val index = Utils.abs(Random.nextInt) %
   availablePartitions.size
   val partitionId = availablePartitions(index).partitionId
   sendPartitionPerTopicCache.put(topic, partitionId)
   partitionId
   }
 } else
   partitioner.partition(key, numPartitions)
   if(partition  0 || partition = numPartitions)
 throw new UnknownTopicOrPartitionException(Invalid partition id:
   +
   partition +  for topic  + topic +
   ; Valid values are in the inclusive range of [0,  +
   (numPartitions-1) + ])
   trace(Assigning message of topic %s and key %s to a selected
  partition
   %d.format(topic, if (key == null) [none] else key.toString,
  partition))
   partition
 }
  
  
   On Mon, Mar 2, 2015 at 3:58 PM, Mayuresh Gharat 
   gharatmayures...@gmail.com
   wrote:
  
Probably your keys are getting hashed to only those partitions. I
 don't
think anything is wrong here.
You can check how the default hashPartitioner is used in the code and
  try
to do the same for your keys before you send them and check which
partitions are those going to.
   
The default hashpartitioner does something like this :
   
hash(key) % numPartitions.
   
Thanks,
   
Mayuresh
   
On Mon, Mar 2, 2015 at 3:52 PM, Yang tedd...@gmail.com wrote:
   
 we have 10 partitions for a topic, and omit the explicit partition
   param
in
 the message creation:

 KeyedMessageString, String data = new KeyedMessageString,
 String
 (mytopic,   myMessageContent);   // partition key need to be
 polished
 producer.send(data);



 but on average 3--5 of the partitions are empty.



 what went wrong?

 thanks
 Yang

   
   
   
--
-Regards,
Mayuresh R. Gharat
(862) 250-7125
   
  
 



Re: Kafka 0.8.2 log cleaner

2015-03-01 Thread Jay Kreps
They are mutually exclusive. Can you expand on the motivation/use for
combining them?

-Jay

On Sunday, March 1, 2015, Ivan Balashov ibalas...@gmail.com wrote:

 Hi,

 Do I understand correctly that compaction and deletion are currently
 mutually exclusive?

 Is it possible to compact recent segments and delete older ones,
 according to general deletion policies?

 Thanks,


 2014-11-30 15:10 GMT+03:00 Manikumar Reddy ku...@nmsworks.co.in
 javascript:;:
  Log cleaner does not support topics with compressed messages.
 
  https://issues.apache.org/jira/browse/KAFKA-1374
 
  On Sun, Nov 30, 2014 at 5:33 PM, Mathias Söderberg 
  mathias.soederb...@gmail.com javascript:; wrote:
 
  Does the log cleaner in 0.8.2 support topics with compressed messages?
 IIRC
  that wasn't supported in 0.8.1.1.
 
  On 29 November 2014 at 17:23, Jun Rao jun...@gmail.com javascript:;
 wrote:
 
   Yes, log cleaner is in 0.8.2. You just need to set the retention
 policy
  of
   a topic to compact.
  
   Thanks,
  
   Jun
  
   On Thu, Nov 27, 2014 at 5:20 AM, Khandygo, Evgeny (EXT) 
   evgeny.khandygo@siemens.com javascript:; wrote:
  
I’m wondering if you could tell me whether log cleaner implemented
 in
0.8.2 because it seems like it didn’t.
   
Thanks
John
   
   
  
 



Re: Unlimited Log Retention

2015-02-28 Thread Jay Kreps
It is totally reasonable to have unlimited retention. We don't have an
explicit setting for this but you can set the time based retention policy
to something large
  log.retention.hours=2147483647
which will retain the log for 245,146 years. :-)

-Jay

On Fri, Feb 27, 2015 at 4:12 PM, Warren Kiser war...@hioscar.com wrote:

 Does anyone know how to achieve unlimited log retention either globally or
 on a per topic basis? I tried explicitly setting the log.retention.bytes to
 -1 but the default time policy kicked in after 7 days and cleaned up the
 messages.

 Thanks!

 Warren



Re: Tips for working with Kafka and data streams

2015-02-25 Thread Jay Kreps
Hey Christian,

That makes sense. I agree that would be a good area to dive into. Are you
primarily interested in network level security or encryption on disk?

-Jay

On Wed, Feb 25, 2015 at 1:38 PM, Christian Csar christ...@csar.us wrote:

 I wouldn't say no to some discussion of encryption. We're running on Azure
 EventHubs (with preparations for Kinesis for EC2, and Kafka for deployments
 in customer datacenters when needed) so can't just use disk level
 encryption (which would have its own overhead). We're putting all of our
 messages inside of encrypted envelopes before sending them to the stream
 which limits our opportunities for schema verification of the underlying
 messages to the declared type of the message.

 Encryption at rest mostly works out to a sales point for customers who want
 assurances, and in a Kafka focused discussion might be dealt with by
 covering disk encryption and how the conversations between Kafka instances
 are protected.

 Christian


 On Wed, Feb 25, 2015 at 11:51 AM, Jay Kreps j...@confluent.io wrote:

  Hey guys,
 
  One thing we tried to do along with the product release was start to put
  together a practical guide for using Kafka. I wrote this up here:
  http://blog.confluent.io/2015/02/25/stream-data-platform-1/
 
  I'd like to keep expanding on this as good practices emerge and we learn
  more stuff. So two questions:
  1. Anything you think other people should know about working with data
  streams? What did you wish you knew when you got started?
  2. Anything you don't know about but would like to hear more about?
 
  -Jay
 



Re: Anyone interested in speaking at Bay Area Kafka meetup @ LinkedIn on March 24?

2015-02-23 Thread Jay Kreps
+1

I think something like Kafka on AWS at Netflix would be hugely
interesting to a lot of people.

-Jay

On Mon, Feb 23, 2015 at 3:02 PM, Allen Wang aw...@netflix.com.invalid
wrote:

 We (Steven Wu and Allen Wang) can talk about Kafka use cases and operations
 in Netflix. Specifically, we can talk about how we scale and operate Kafka
 clusters in AWS and how we migrate our data pipeline to Kafka.

 Thanks,
 Allen


 On Mon, Feb 23, 2015 at 12:15 PM, Ed Yakabosky 
 eyakabo...@linkedin.com.invalid wrote:

  Hi Kafka Open Source -
 
  LinkedIn will host another Bay Area Kafka meetup in Mountain View on
 March
  24.  We are planning to present on Offset Management but are looking for
  additional speakers.  If you’re interested in presenting a use case,
  operational plan, or your experience with a particular feature (REST
  interface, WebConsole), please reply-all to let us know.
 
  [BCC: Open Source lists]
 
  Thanks,
  Ed
 



Re: New Producer - Is the configurable partitioner gone?

2015-02-22 Thread Jay Kreps
Hey Daniel,

Yeah I think that would be doable. If you want to pursue it you would need
to do a quick KIP just to get everyone on the same page since this would be
a public interface we would have to support over a long time:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals

When we have the details worked out, then it should be a fairly
straight-forward patch to make that pluggable.

A few comments:
- I think we should just make the DefaultPartitioner the default value for
that configuration, rather than having it be a fall back.
- You need to pass in the binary key and value in addition to the java
objects. Otherwise any partitioning based on the binary value will require
reserializing these.
- If we add this option we should really ship at least one other useful
partitioning strategy. The low connection partitioner might work for this
by attempting to reuse recently used nodes whenever possible. That is
useful in environments with lots and lots of producers where you don't care
about semantic partitioning. It would be good to think through if there are
any other useful partitioning strategies to make sure they would also be
doable with the interface we would end up with.
- Currently Cluster is not a public class so we'll have to think about
whether we want to make that public.

-Jay


On Sun, Feb 22, 2015 at 4:44 AM, Daniel Wegener 
daniel.wege...@holisticon.de wrote:


 Jay Kreps jay.kreps@... writes:

 
  Hey Daniel,
 
  partitionsFor() will block the very first time it sees a new topic that
 it
  doesn't have metadata for yet. If you want to ensure you don't block even
  that one time, call it prior to your regular usage so it initializes
 then.
 
  The rationale for adding a partition in ProducerRecord was that there are
  actually cases where you want to associate a key with the record but not
  use it for partitioning. The rationale for not retaining the pluggable
  partitioner was that you could always just use the partition (many people
  dislike the plugin apis and asked for it). Personally, like you, I
  preferred the plugin apis.
 
  We aren't cut off from exposing the existing partitioner usage as a
 public
  api that you can override if there is anyone who wants that. I think one
  nice thing about it would be the ability to ship with an alternative
  partitioning strategy that you could enable purely in config. For example
  the old producer had a partitioning strategy that attempted to minimize
 the
  number of TCP connections for cases where there was no key. 98% of people
  loathed that, but for 2% of people it was useful and this would be a way
 to
  still include that behavior for people migrating to the new producer.
 
  -Jay
 
  On Sat, Feb 21, 2015 at 3:34 PM, Daniel Wegener 
  daniel.wegener@... wrote:
 
   Gwen Shapira gshapira at ... writes:
  
   
Hi Daniel,
   
I think you can still use the same logic you had in the custom
   partitioner
in the old producer. You just move it to the client that creates the
records.
The reason you don't cache the result of partitionsFor is that the
   producer
should handle the caching for you, so its not necessarily a long or
blocking call.
   
I see it as a pretty small change to the API. But I'm not sure what
 drove
the change either.
   
Gwen
   
On Fri, Feb 20, 2015 at 9:19 AM, Daniel Wegener 
Daniel.Wegener at ... wrote:
   
 Hello Kafka-users!

 I am facing a migration from a kind of ( a bit self plumbed) kafka
   0.8.1
 producer to the new kafka-clients API. I just recognized, that the
 new
 KafkaProducer initializes its own Partitioner that cannot be
 changed
   (final
 field, no ctor-param, no
 Class.forName(config.getPartitionerClassNameFoo()).newInstance()).
 Is
   this
 an intentional change?
 If i understand the new API correctly, one should either define a
 key
   for
 a message and let the default Partitioner care that it will be
   distributed
 over all available partitions or to set an explicit partition
 number
   per
 message that will be written to.

 The old API api allowed to create ProducerRecors with a key and/or
 a
   key
 used only for partitioning (but one that is not sent down the wire)
 and
 then to provide a custom Partitioner that later could distribute
 this
 partitioning key over all available partitions when the message is
   actually
 sent.

 The difference in the new procuder API is that we need to know the
   exact
 number of available partitions before we even create a
 ProducerRecord.
   If
 we dont ensure the correct number of partitions and try to send a
   message
 to a partition that does not exist, the whole message will blow up
   later
 when the producer tries to send it.

 I dont expect the partition count to change that often but the API-
 doc
 states that a partitionsFor(String topic) result _should not_

Re: High CPU usage of Crc32 on Kafka broker

2015-02-22 Thread Jay Kreps
Here's my summary of the state of the compression discussion:

   1. We all agree that current compression performance isn't very good and
   it would be nice to improve it.
   2. This is not entirely due to actual (de)compression, in large part it
   is inefficiencies in the current implementation. Snappy is like
   300Mb/sec/core so should not be a bottleneck. We could probably hugely
   improve performance without any fundamental changes. See:
   https://issues.apache.org/jira/browse/KAFKA-527
   3. There are really three separate things that get conflated:
  1. De-compression on the server
  2. Re-compression on the server
  3. De-compression and re-compression in mirror maker
   4. Getting rid of de-compression on the server is unlikely to happen
   because de-compression is required to validate the data sent. In the very
   early days of Kafka we did indeed just append whatever the client sent us
   to the binary log without validation. Then we realized that any bug in any
   of the clients in all the languages would potentially corrupt the log and
   potentially thus bring down the whole cluster. You can imagine how we
   realized this! This is why basically no system in the world appends client
   data directly to it's binary on disk structures. Decompression can
   potentially be highly optimized, though, by not fully instantiating
   messages.
   5. The current compression code re-compresses the data to assign it
   sequential offsets. It would be possible to improve this by allowing some
   kind of relative offset scheme where the individual messages would have
   offsets like (-3,-2,-1, 0) and this would be interpreted relative to the
   offset of the batch. This would let us avoid recompression for co-operating
   clients.
   6. This would likely require bumping the log version. Prior to doing
   this we need to have better backwards compatibility support in place to
   make this kind of upgrade easy to do.
   7. Optimizing de-compression and re-compression in mm requires having
   APIs that give you back uncompressed messages and let you send already
   compressed batches. This might be possible but it would break a lot of
   things like the proposed filters in mm. We would also need to do this in a
   way that it wasn't too gross of an API.

-Jay

On Sun, Feb 22, 2015 at 2:16 AM, Jan Filipiak jan.filip...@trivago.com
wrote:

 I just want to bring up that idea of no server side de/recompression
 again. Features like KAFKA-1499 https://issues.apache.org/
 jira/browse/KAFKA-1499 seem to steer the project into a different
 direction and the fact that tickets like KAFKA-845 
 https://issues.apache.org/jira/browse/KAFKA-845 are not getting much
 attention gives the same impression. This is something my head keeps
 spinning around almost 24/7 recently.

 The problem I see is that CPU's are not the cheapest part of a new server
 and if you can spare a gigahertz or some cores by just making sure your
 configs are the same across all producers I would always opt for the
 operational overhead instead of the bigger servers. I think this will
 usually decrease the tco's of kafka installations.

 I am currently not familiar enough with the codebase to judge if server
 side decompression happens before acknowledge. If so, these would be some
 additional milliseconds to respond faster if we could spare
 de/recompression.

 Those are my thoughts about server side de/recompression. It would be
 great if I could get some responses and thoughts back.

 Jan




 On 07.11.2014 00:23, Jay Kreps wrote:

 I suspect it is possible to save and reuse the CRCs though it might be a
 bit of an invasive change. I suspect the first usage is when we are
 checking the validity of the messages and the second is from when we
 rebuild the compressed message set (I'm assuming you guys are using
 compression because I think we optimize this out otherwise). Technically I
 think the CRCs stay the same.

 An alternative approach, though, would be working to remove the need for
 recompression entirely on the broker side by making the offsets in the
 compressed message relative to the base offset of the message set. This is
 a much more invasive change but potentially better as it would also remove
 the recompression done on the broker which is also CPU heavy.

 -Jay

 On Thu, Nov 6, 2014 at 2:36 PM, Allen Wang aw...@netflix.com.invalid
 wrote:

  Sure. Here is the link to the screen shot of jmc with the JTR file
 loaded:

 http://picpaste.com/fligh-recorder-crc.png



 On Thu, Nov 6, 2014 at 2:12 PM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

  Allen,

 Apache mailing lists don't allow attachments. Could you please link to a
 pastebin or something?

 Thanks,
 Neha

 On Thu, Nov 6, 2014 at 12:02 PM, Allen Wang aw...@netflix.com.invalid
 wrote:

  After digging more into the stack trace got from flight recorder (which

 is

 attached), it seems that Kafka (0.8.1.1) can optimize the usage of

 Crc32.

 The stack trace

Re: New Producer - Is the configurable partitioner gone?

2015-02-22 Thread Jay Kreps
Interesting, and this was with the new Java client? This sounds like as
much an opportunity for improvement in the code as anything. Would you be
willing to share the details?

-jay

On Sunday, February 22, 2015, Steven Wu stevenz...@gmail.com wrote:

  The low connection partitioner might work for this
 by attempting to reuse recently used nodes whenever possible. That is
 useful in environments with lots and lots of producers where you don't care
 about semantic partitioning.

 In one of the perf test, we found that above sticky partitioner improved
 batching and reduced cpu util at broker side by 60%. We plan to make it our
 default partitioner.



 On Sun, Feb 22, 2015 at 10:28 AM, Jay Kreps jay.kr...@gmail.com
 javascript:; wrote:

  Hey Daniel,
 
  Yeah I think that would be doable. If you want to pursue it you would
 need
  to do a quick KIP just to get everyone on the same page since this would
 be
  a public interface we would have to support over a long time:
 
 
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
 
  When we have the details worked out, then it should be a fairly
  straight-forward patch to make that pluggable.
 
  A few comments:
  - I think we should just make the DefaultPartitioner the default value
 for
  that configuration, rather than having it be a fall back.
  - You need to pass in the binary key and value in addition to the java
  objects. Otherwise any partitioning based on the binary value will
 require
  reserializing these.
  - If we add this option we should really ship at least one other useful
  partitioning strategy. The low connection partitioner might work for this
  by attempting to reuse recently used nodes whenever possible. That is
  useful in environments with lots and lots of producers where you don't
 care
  about semantic partitioning. It would be good to think through if there
 are
  any other useful partitioning strategies to make sure they would also be
  doable with the interface we would end up with.
  - Currently Cluster is not a public class so we'll have to think about
  whether we want to make that public.
 
  -Jay
 
 
  On Sun, Feb 22, 2015 at 4:44 AM, Daniel Wegener 
  daniel.wege...@holisticon.de javascript:; wrote:
 
  
   Jay Kreps jay.kreps@... writes:
  
   
Hey Daniel,
   
partitionsFor() will block the very first time it sees a new topic
 that
   it
doesn't have metadata for yet. If you want to ensure you don't block
  even
that one time, call it prior to your regular usage so it initializes
   then.
   
The rationale for adding a partition in ProducerRecord was that there
  are
actually cases where you want to associate a key with the record but
  not
use it for partitioning. The rationale for not retaining the
 pluggable
partitioner was that you could always just use the partition (many
  people
dislike the plugin apis and asked for it). Personally, like you, I
preferred the plugin apis.
   
We aren't cut off from exposing the existing partitioner usage as a
   public
api that you can override if there is anyone who wants that. I think
  one
nice thing about it would be the ability to ship with an alternative
partitioning strategy that you could enable purely in config. For
  example
the old producer had a partitioning strategy that attempted to
 minimize
   the
number of TCP connections for cases where there was no key. 98% of
  people
loathed that, but for 2% of people it was useful and this would be a
  way
   to
still include that behavior for people migrating to the new producer.
   
-Jay
   
On Sat, Feb 21, 2015 at 3:34 PM, Daniel Wegener 
daniel.wegener@... wrote:
   
 Gwen Shapira gshapira at ... writes:

 
  Hi Daniel,
 
  I think you can still use the same logic you had in the custom
 partitioner
  in the old producer. You just move it to the client that creates
  the
  records.
  The reason you don't cache the result of partitionsFor is that
 the
 producer
  should handle the caching for you, so its not necessarily a long
 or
  blocking call.
 
  I see it as a pretty small change to the API. But I'm not sure
 what
   drove
  the change either.
 
  Gwen
 
  On Fri, Feb 20, 2015 at 9:19 AM, Daniel Wegener 
  Daniel.Wegener at ... wrote:
 
   Hello Kafka-users!
  
   I am facing a migration from a kind of ( a bit self plumbed)
  kafka
 0.8.1
   producer to the new kafka-clients API. I just recognized, that
  the
   new
   KafkaProducer initializes its own Partitioner that cannot be
   changed
 (final
   field, no ctor-param, no
  
  Class.forName(config.getPartitionerClassNameFoo()).newInstance()).
   Is
 this
   an intentional change?
   If i understand the new API correctly, one should either
 define a
   key
 for
   a message and let the default Partitioner care

  1   2   3   4   >