A couple of thoughts:

1. @Joel I agree it's not hard to use the new API but it definitely is more
verbose. If that snippet of code is being written across hundreds of
projects, that probably means we're missing an important API. Right now
I've only seen the one complaint, but it's worth finding out how many
people feel like it's missing. And given that internally each of the
returned Futures just uses the future for the entire batch, I think it's
probably worth investigating if getting rid of millions of allocs per
second is worth it, even if they should be in the nursery and fast to
collect.

2. For lots of small messages, there's definitely the potential for a
performance benefit by avoiding a lot of lock acquire/release in send(). If
you make a first pass to organize by topic partition and then process each
group, you lock # of partitions times rather than # of messages times. One
major drawback I see is that it seems to make a mess of error
handling/blocking when the RecordAccumulator runs out of space.

3. @Roshan In the other thread you mentioned 10 byte messages. Is this a
realistic payload size for you? I can imagine applications where it is (and
we should support those well), it just sounds unusually small.

4. I reproduced Jay's benchmark blog post awhile ago in an automated test
(see
https://github.com/confluentinc/muckrake/blob/master/muckrake/tests/kafka_benchmark_test.py).
Here's a snippet from the output on m3.2xlarge instances that might help
shed some light on the situation:
INFO:_.KafkaBenchmark:Message size:
INFO:_.KafkaBenchmark: 10: 1637825.195625 rec/sec (15.620000 MB/s)
INFO:_.KafkaBenchmark: 100: 605504.877911 rec/sec (57.750000 MB/s)
INFO:_.KafkaBenchmark: 1000: 90351.817570 rec/sec (86.170000 MB/s)
INFO:_.KafkaBenchmark: 10000: 8306.180862 rec/sec (79.210000 MB/s)
INFO:_.KafkaBenchmark: 100000: 978.403499 rec/sec (93.310000 MB/s)

That's using the single-threaded new ProducerPerformance class, so the
m3.2xlarge's # of cores probably has little influence. There's clearly a
sharp increase in throughput from 10 -> 100 byte messages. I recall double
checking that the CPU was fully utilized. Note that this is with the acks=1
setting that doesn't actually exist anymore, so take with a grain of salt.

5. I'd suggest that there may be other APIs that give the implementation
more flexibility but still provide batching. For example:
* Require batched inputs to be prepartitioned so each call specifies the
TopicPartition. Main benefit here is that the producer avoids having to do
all the sorting, which the application may already be doing anyway.
* How about an API similar to fwrite() where you provide a set of messages
but it may only write some of them and tells you how many it wrote? This
could be a clean way to expose the underlying batching that is performed
without being a completely leaky abstraction. We could then return just a
single future for the entire batch, we'd do minimal locking, etc. Not sure
how to handle different TopicPartitions in the same set. I think this could
be a good pattern for people who want maximally efficient ordered writes
where errors are properly handled too.

6. If I recall correctly, doesn't compression occur in a synchronized
block, I think in the RecordAccumulator? Or maybe it was in the network
thread? In any case, I seem to recall compression also possibly playing an
important role in performance because it operates over a set of records
which limits where you can run it. @Roshan, are you using compression, both
in your microbenchmarks and your application?

I think there's almost definitely a good case to be made for a batch API,
but probably needs some very clear motivating use cases and perf
measurements showing why it's not going to be feasible to accomplish with
the current API + a few helpers to wrap it in a batch API.

-Ewen


On Mon, Apr 27, 2015 at 4:24 PM, Joel Koshy <jjkosh...@gmail.com> wrote:

>
> >   Fine grained tracking of status of individual events is quite painful
> in
> > contrast to simply blocking on every batch. Old style Batched-sync mode
> > has great advantages in terms of simplicity and performance.
>
> I may be missing something, but I'm not so convinced that it is that
> painful/very different from the old-style.
>
> In the old approach, you would compose a batch (in a list of messages)
> and do a synchronous send:
>
> try {
>   producer.send(recordsToSend)
> }
> catch (...) {
>   // handle (e.g., retry sending recordsToSend)
> }
>
> In the new approach, you would do (something like) this:
>
> for (record: recordsToSend) {
>   futureList.add(producer.send(record));
> }
> producer.flush();
> for (result: futureList) {
>   try { result.get(); }
>   catch (...) { // handle (e.g., retry sending recordsToSend) }
> }
>
>
>


-- 
Thanks,
Ewen

Reply via email to