On Wed, Apr 29, 2015 at 6:08 PM, Gwen Shapira <gshap...@cloudera.com> wrote:

> I'm starting to think that the old adage "If two people say you are drunk,
> lie down" applies here :)
>
> Current API seems perfectly clear, useful and logical to everyone who wrote
> it... but we are getting multiple users asking for the old batch behavior
> back.
> One reason to get it back is to make upgrades easier - people won't need to
> rethink their existing logic if they get an API with the same behavior in
> the new producer. The other reason is what Ewen mentioned earlier - if
> everyone re-implements Joel's logic, we can provide something for that.
>
> How about getting the old batch send behavior back by adding a new API
> with:
> public void batchSend(List<ProducerRecord<K,V>>)
>
> With this implementation (mixes the old behavior with Joel's snippet):
> * send records one by one
> * flush
> * iterate on futures and "get" them
> * log a detailed message on each error
> * throw an exception if any send failed.
>
> It reproduces the old behavior - which apparently everyone really liked,
> and I don't think it is overly weird. It is very limited, but anyone who
> needs more control over his sends already have plenty of options.
>

First, I'll just say that I actually prefer a smaller API -- most wrappers
are trivial and if someone reuse them *that* often, they can always package
them into a wrapper API. Implementing everything people repeat can be a
slippery slope towards including every pattern under the sun. I'm
definitely skeptical of adding to the core API if there isn't something
that requires a specialized implementation. If it works as a trivial
wrapper that people want in Kafka proper, then it'd probably be better
placed in a contrib package or implemented as a separate utility wrapper
class that guarantees isolation from the main producer implementation.

Second, one of the reasons I mentioned alternative APIs is that I think the
behavior you'd get with this API is fairly confusing and hard to work with
given the behavior I *think* people are actually looking for. I suspect the
reason they want synchronous batch behavior is to get atomic batch writes.
They aren't going to get this anyway (as Jay pointed out) given the current
broker implementation, but you could get a close approximation if you have
max 1 in flight request and retry indefinitely (with the retries going to
the front of the queue to avoid out-of-order writes). That has the drawback
that failures never bubble up. Alternatively, you need to be able to
immediately clear the buffered records upon a failure.

I mentioned the fwrite()-like API because it lets you expose semantics like
this in a way I think can be pretty clean -- give the producer a list of
records and it will use as many as it can without a) violating the buffer
restrictions and b) ensuring that all the accepted requests will be
included in the same request to the broker. You're guaranteed that there
are no "extra" records in the accumulator if something fails, which lets
you get an error back without the possibility of message reordering,
allowing you to handle the issue however you like. (This still has a ton of
issues if you introduce client-side timeouts, which is why
https://cwiki.apache.org/confluence/display/KAFKA/Idempotent+Producer would
be the real solution.)

I was also sensitive to the possible performance issues because I was
recently bitten by an issue where some code was unexpectedly processing
characters one at a time with really awful performance as a result, and a
small bit of batching solved the problem :) But here, I agree with Jay that
there's enough other stuff going on under the hood that this alone isn't
likely to have a huge impact. Then again, I'd love to see some profiler
numbers from a semi-realistic workload!

On clarity of and ease of use of APIs, and coming back to the behavior I
think people are looking for -- sometimes the challenge isn't just
documenting the API you've concluded is the right one, but also explaining
why the API everyone seems to want can't work/was broken/doesn't provide
the guarantees they thought it did.


> Thoughts?
>
> Gwen
>
>
>
>
> On Tue, Apr 28, 2015 at 5:29 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
>
> > Hey guys,
> >
> > The locking argument is correct for very small records (< 50 bytes),
> > batching will help here because for small records locking becomes the big
> > bottleneck. I think these use cases are rare but not unreasonable.
> >
> > Overall I'd emphasize that the new producer is way faster at virtually
> all
> > use cases. If there is a use case where that isn't true, let's look at it
> > in a data driven way by comparing the old producer to the new producer
> and
> > looking for any areas where things got worse.
> >
> > I suspect the "reducing allocations" argument to be not a big thing. We
> do
> > a number of small per-message allocations and it didn't seem to have much
> > impact. I do think there are a couple of big producer memory
> optimizations
> > we could do by reusing the arrays in the accumulator in the serialization
> > of the request but I don't think this is one of them.
> >
> > I'd be skeptical of any api that was too weird--i.e. introduces a new way
> > of partitioning, gives back errors on a per-partition rather than per
> > message basis (given that partitioning is transparent this is really hard
> > to think about), etc. Bad apis end up causing a ton of churn and just
> don't
> > end up being a good long term commitment as we change how the underlying
> > code works over time (i.e. we hyper optimize for something then have to
> > maintain some super weird api as it becomes hyper unoptimized for the
> > client over time).
> >
> > Roshan--Flush works as you would hope, it blocks on the completion of all
> > outstanding requests. Calling get on the future for the request gives you
> > the associated error code back. Flush doesn't throw any exceptions
> because
> > waiting for requests to complete doesn't error, the individual requests
> > fail or succeed which is always reported with each request.
> >
> > Ivan--The batches you send in the scala producer today actually aren't
> > truely atomic, they just get sent in a single request.
> >
> > One tricky problem to solve when user's do batching is size limits on
> > requests. This can be very hard to manage since predicting the serialized
> > size of a bunch of java objects is not always obvious. This was
> repeatedly
> > a problem before.
> >
> > -Jay
> >
> > On Tue, Apr 28, 2015 at 4:51 PM, Ivan Balashov <ibalas...@gmail.com>
> > wrote:
> >
> > > I must agree with @Roshan – it's hard to imagine anything more
> intuitive
> > > and easy to use for atomic batching as old sync batch api. Also, it's
> > fast.
> > > Coupled with a separate instance of producer per
> > > broker:port:topic:partition it works very well. I would be glad if it
> > finds
> > > its way into new producer api.
> > >
> > > On a side-side-side note, could anyone confirm/deny if SimpleConsumer's
> > > fetchSize must be set at least as batch bytes (before or after
> > > compression), otherwise client risks not getting any messages?
> > >
> >
>



-- 
Thanks,
Ewen

Reply via email to