Re: [VOTE] KIP-629: Use racially neutral terms in our codebase

2020-06-25 Thread Jay Kreps
+1

On Thu, Jun 25, 2020 at 6:39 PM Bill Bejeck  wrote:

> Thanks for this KIP Xavier.
>
> +1(binding)
>
> -Bill
>
> On Thu, Jun 25, 2020 at 9:04 PM Gwen Shapira  wrote:
>
> > +1 (binding)
> >
> > Thank you Xavier!
> >
> > On Thu, Jun 25, 2020, 3:44 PM Xavier Léauté  wrote:
> >
> > > Hi Everyone,
> > >
> > > I would like to initiate the voting process for KIP-629.
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-629%3A+Use+racially+neutral+terms+in+our+codebase
> > >
> > > Thank you,
> > > Xavier
> > >
> >
>


Re: [DISCUSS] KIP-263: Allow broker to skip sanity check of inactive segments on broker startup

2018-03-21 Thread Jay Kreps
Hey Dong,

Makes total sense. What I'm saying is I don't think that the sanity check
is part of any formal guarantee we provide. It is true that corruption of
data flushed to disk will be a potential problem, but I don't think the
sanity check solves that it just has a couple heuristics to help detect
certain possible instances of it, right? In general I think our assumption
has been that flushed data doesn't disappear or get corrupted and if it
does you need to manually intervene. I don't think people want to configure
things at this level so what I was suggesting was understanding why the
sanity check is slow and trying to avoid that rather than making it
configurable. I think you mentioned it was reading the full index into
memory. Based on the performance you describe this could be true, but it
definitely should not be reading anything but the last entry in the index
so that would be a bug. That read also happens in sanityCheck() only in the
time-based index right? In the offset index we do the same read but it
happens in initialization. If that read is the slow thing it might make
sense to try to remove it or make it lazy in both cases. If it is some
other part of the code then (e.g. the size check) then that may be able to
be avoided entirely (I think by the time we sanity check we already know
the file size from the mapping...). That was what I meant by doing some
data driven analysis. Maybe a quick run with hprof would help determine the
root cause of why sanityCheck is slow?

-Jay

On Tue, Mar 20, 2018 at 12:13 AM Dong Lin <lindon...@gmail.com> wrote:

> Hey Jay,
>
> Thanks for your comments!
>
> Yeah recovery is different from the sanity check. They are correlated in
> the sense that there may still be corrupted index files even after clean
> broker shutdown. And in that case if we delay the sanity check then we may
> delay the log recovery. The main goal of this KIP is to optimize the sanity
> check related work so that it does not delay the broker startup much.
>
> The KIP mentioned that the sanity check is done using log recovery
> background thread. The name "recovery" is mentioned mainly because the
> background thread number is determined using the existing
> config num.recovery.threads.per.data.dir. I have updated the KIP to make
> this less confusing.
>
> It makes a ton of sense to optimize the broker startup time in a data
> driven fashion. The currently optimize is done kind of in this fashion. The
> broker log shows that LogManager.loadLogs() takes a long time in large
> clusters. Then I started broker with cold cache and repeatedly get thread
> dump to see what are broker threads are doing during LogManager.loadLogs().
> Most of the threads are working on sanityCheck() and this motivates the
> change in this KIP. Previously broker shutdown time was investigated in a
> similar data driven fashion and optimized with KAFKA-6172 and KAFKA-6175.
> It seems that the current KIP can reduces the rolling bounce time of a
> large cluster by 50% -- there may be room for further improvement but maybe
> those do not require as big a change (with the caveat described in the KIP)
> as suggested in this KIP.
>
> It is not clear whether it is safe to just read the latest segment without
> sanity checking all previous inactive segment of a given partition if
> transaction is used. Otherwise we probably want to always skip the sanity
> check of inactive segments without introducing a new config. Maybe the
> developers familiar with the transaction can comment on that?
>
> Thanks,
> Dong
>
>
> On Mon, Mar 19, 2018 at 7:21 PM, Jay Kreps <j...@confluent.io> wrote:
>
> > Optimizing startup seems really valuable but I'm a little confused by
> this.
> >
> > There are two different things:
> > 1. Recovery
> > 2. Sanity check
> >
> > The terminology we're using is a bit mixed here.
> >
> > Recovery means checksumming the log segments and rebuilding the index on
> a
> > hard crash. This only happens on unflushed segments, which is generally
> > just the last segment. Recovery is essential for the correctness
> guarantees
> > of the log and you shouldn't disable it. It only happens on hard crash
> and
> > is not a factor in graceful restart. We can likely optimize it but that
> > would make most sense to do in a data driven fashion off some profiling.
> >
> > However there is also a ton of disk activity that happens during
> > initialization (lots of checks on the file size, absolute path, etc). I
> > think these have crept in over time with people not really realizing this
> > code is perf sensitive and java hiding a lot of what is and isn't a file
> > operation. One part of this is the sanityCheck() call for the two
> indexes.
> &g

Re: [DISCUSS] KIP-263: Allow broker to skip sanity check of inactive segments on broker startup

2018-03-19 Thread Jay Kreps
Optimizing startup seems really valuable but I'm a little confused by this.

There are two different things:
1. Recovery
2. Sanity check

The terminology we're using is a bit mixed here.

Recovery means checksumming the log segments and rebuilding the index on a
hard crash. This only happens on unflushed segments, which is generally
just the last segment. Recovery is essential for the correctness guarantees
of the log and you shouldn't disable it. It only happens on hard crash and
is not a factor in graceful restart. We can likely optimize it but that
would make most sense to do in a data driven fashion off some profiling.

However there is also a ton of disk activity that happens during
initialization (lots of checks on the file size, absolute path, etc). I
think these have crept in over time with people not really realizing this
code is perf sensitive and java hiding a lot of what is and isn't a file
operation. One part of this is the sanityCheck() call for the two indexes.
I don't think this call reads the full index, just the last entry in the
index, right?. There should be no need to read the full index except during
recovery (and then only for the segments being recovered). I think it would
make a ton of sense to optimize this but I don't think that optimization
needs to be configurable as this is just a helpful sanity check to detect
common non-sensical things in the index files, but it isn't part of the
core guarantees, in general you aren't supposed to lose committed data from
disk, and if you do we may be able to fail faster but we fundamentally
can't really help you. Again I think this would make the most sense to do
in a data driven way, if you look at that code I think it is doing crazy
amounts of file operations (e.g. getAbsolutePath, file sizes, etc). I think
it'd make most sense to profile startup with a cold cash on a large log
directory and do the same with an strace to see how many redundant system
calls we do per segment and what is costing us and then cut some of this
out. I suspect we could speed up our startup time quite a lot if we did
that.

For example we have a bunch of calls like this:

require(len % entrySize == 0,

"Index file " + file.getAbsolutePath + " is corrupt, found " +
len +

" bytes which is not positive or not a multiple of 8.")
I'm pretty such file.getAbsolutePath is a system call and I assume that
happens whether or not you fail the in-memory check?

-Jay


On Sun, Feb 25, 2018 at 10:27 PM, Dong Lin  wrote:

> Hi all,
>
> I have created KIP-263: Allow broker to skip sanity check of inactive
> segments on broker startup. See
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 263%3A+Allow+broker+to+skip+sanity+check+of+inactive+
> segments+on+broker+startup
> .
>
> This KIP provides a way to significantly reduce time to rolling bounce a
> Kafka cluster.
>
> Comments are welcome!
>
> Thanks,
> Dong
>


Re: [DISCUSS] KIP-253: Support in-order message delivery with partition expansion

2018-03-07 Thread Jay Kreps
Hey Jason,

I agree. Even apart from this proposal the partitioning strategy is really
an essential part of the metadata for a topic and had we been less lazy we
probably would have included it with the topic metadata.

I think in terms of grandfathering this in you could have existing topics
just be auto-assigned a "client" partitioning and add a "linear" strategy
(or whatever) that is that is checked server-side and supported in terms of
re-partitioning.

-Jay

On Mon, Mar 5, 2018 at 10:43 AM, Jason Gustafson  wrote:

> Great discussion. I think I'm wondering whether we can continue to leave
> Kafka agnostic to the partitioning strategy. The challenge is communicating
> the partitioning logic from producers to consumers so that the dependencies
> between each epoch can be determined. For the sake of discussion, imagine
> you did something like the following:
>
> 1. The name (and perhaps version) of a partitioning strategy is stored in
> topic configuration when a topic is created.
> 2. The producer looks up the partitioning strategy before writing to a
> topic and includes it in the produce request (for fencing). If it doesn't
> have an implementation for the configured strategy, it fails.
> 3. The consumer also looks up the partitioning strategy and uses it to
> determine dependencies when reading a new epoch. It could either fail or
> make the most conservative dependency assumptions if it doesn't know how to
> implement the partitioning strategy. For the consumer, the new interface
> might look something like this:
>
> // Return the partition dependencies following an epoch bump
> Map dependencies(int numPartitionsBeforeEpochBump,
> int numPartitionsAfterEpochBump)
>
> The unordered case then is just a particular implementation which never has
> any epoch dependencies. To implement this, we would need some way for the
> consumer to find out how many partitions there were in each epoch, but
> maybe that's not too unreasonable.
>
> Thanks,
> Jason
>
>
> On Mon, Mar 5, 2018 at 4:51 AM, Jan Filipiak 
> wrote:
>
> > Hi Dong
> >
> > thank you very much for your questions.
> >
> > regarding the time spend copying data across:
> > It is correct that copying data from a topic with one partition mapping
> to
> > a topic with a different partition mapping takes way longer than we can
> > stop producers. Tens of minutes is a very optimistic estimate here. Many
> > people can not afford copy full steam and therefore will have some rate
> > limiting in place, this can bump the timespan into the day's. The good
> part
> > is that the vast majority of the data can be copied while the producers
> are
> > still going. One can then, piggyback the consumers ontop of this
> timeframe,
> > by the method mentioned (provide them an mapping from their old offsets
> to
> > new offsets in their repartitioned topics. In that way we separate
> > migration of consumers from migration of producers (decoupling these is
> > what kafka is strongest at). The time to actually swap over the producers
> > should be kept minimal by ensuring that when a swap attempt is started
> the
> > consumer copying over should be very close to the log end and is expected
> > to finish within the next fetch. The operation should have a time-out and
> > should be "reattemtable".
> >
> > Importance of logcompaction:
> > If a producer produces key A, to partiton 0, its forever gonna be there,
> > unless it gets deleted. The record might sit in there for years. A new
> > producer started with the new partitions will fail to delete the record
> in
> > the correct partition. Th record will be there forever and one can not
> > reliable bootstrap new consumers. I cannot see how linear hashing can
> solve
> > this.
> >
> > Regarding your skipping of userland copying:
> > 100%, copying the data across in userland is, as far as i can see, only a
> > usecase for log compacted topics. Even for logcompaction + retentions it
> > should only be opt-in. Why did I bring it up? I think log compaction is a
> > very important feature to really embrace kafka as a "data plattform". The
> > point I also want to make is that copying data this way is completely
> > inline with the kafka architecture. it only consists of reading and
> writing
> > to topics.
> >
> > I hope it clarifies more why I think we should aim for more than the
> > current KIP. I fear that once the KIP is done not much more effort will
> be
> > taken.
> >
> >
> >
> >
> > On 04.03.2018 02:28, Dong Lin wrote:
> >
> >> Hey Jan,
> >>
> >> In the current proposal, the consumer will be blocked on waiting for
> other
> >> consumers of the group to consume up to a given offset. In most cases,
> all
> >> consumers should be close to the LEO of the partitions when the
> partition
> >> expansion happens. Thus the time waiting should not be long e.g. on the
> >> order of seconds. On the other hand, it may take a long time to wait for
> >> the entire partition 

Re: [VOTE] KIP-186: Increase offsets retention default to 7 days

2018-03-07 Thread Jay Kreps
+1

I think we can improve this in the future, but this simple change will
avoid a lot of pain. Thanks for reviving it Ewen.

-Jay

On Mon, Mar 5, 2018 at 11:35 AM, Ewen Cheslack-Postava 
wrote:

> I'd like to kick off voting for KIP-186:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 186%3A+Increase+offsets+retention+default+to+7+days
>
> This is the trivial fix that people in the DISCUSS thread were in favor of.
> There are some ideas for further refinements, but I think we can follow up
> with those in subsequent KIPs, see the discussion thread for details. Also
> note this is related, but complementary, to
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets
> .
>
> And of course +1 (binding) from me.
>
> Thanks,
> Ewen
>


Re: [DISCUSS] KIP-253: Support in-order message delivery with partition expansion

2018-03-02 Thread Jay Kreps
Hey Dong,

Cool, obviously we'd need to have a solution here work with connect and
streams to be viable.

On the linear hashing thing, what I am talking about is something
different. I am talking about splitting existing partitions incrementally.
E.g. if you have 100 partitions and want to move to 110. Obviously a naive
approach which added partitions would require you to reshuffle all data as
the hashing of all data would change. A linear hashing-like scheme gives an
approach by which you can split individual partitions one at a time to
avoid needing to reshuffle much data. This approach has the benefit that at
any time you have a fixed number of partitions and all data is fully
partitioned with whatever the partition count you choose is but also has
the benefit that you can dynamically scale up or down the partition count.
This seems like it simplifies things like log compaction etc.

-Jay

On Sun, Feb 25, 2018 at 3:51 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Jay,
>
> Thanks for the comment!
>
> I have not specifically thought about how this works with Streams and
> Connect. The current KIP w.r.t. the interface that our producer and
> consumer exposes to the user. It ensures that if there are two messages
> with the same key produced by the same producer, say messageA and messageB,
> and suppose messageB is produced after messageA to a different partition
> than messageA, then we can guarantee that the following sequence can happen
> in order:
>
> - Consumer of messageA can execute callback, in which user can flush state
> related to the key of messageA.
> - messageA is delivered by its consumer to the application
> - Consumer of messageB can execute callback, in which user can load the
> state related to the key of messageB.
> - messageB is delivered by its consumer to the application.
>
> So it seems that it should support Streams and Connect properly. But I am
> not entirely sure because I have not looked into how Streams and Connect
> works. I can think about it more if you can provide an example where this
> does not work for Streams and Connect.
>
> Regarding the second question, I think linear hashing approach provides a
> way to reduce the number of partitions that can "conflict" with a give
> partition to *log_2(n)*, as compares to *n* in the current KIP, where n is
> the total number of partitions of the topic. This will be useful when
> number of partition is large and asymptotic complexity matters.
>
> I personally don't think this optimization is worth the additional
> complexity in Kafka. This is because partition expansion or deletion should
> happen infrequently and the largest number of partitions of a single topic
> today is not that large -- probably 1000 or less. And when partitions of a
> topic changes, each consumer will likely need to query and wait for
> positions of a large percentage of partitions of the topic anyway even with
> this optimization. I think this algorithm is kind of orthogonal to this
> KIP. We can extend the KIP to support this algorithm in the future as well.
>
> Thanks,
> Dong
>
> On Thu, Feb 22, 2018 at 5:19 PM, Jay Kreps <j...@confluent.io> wrote:
>
> > Hey Dong,
> >
> > Two questions:
> > 1. How will this work with Streams and Connect?
> > 2. How does this compare to a solution where we physically split
> partitions
> > using a linear hashing approach (the partition number is equivalent to
> the
> > hash bucket in a hash table)? https://en.wikipedia.org/wiki/
> Linear_hashing
> >
> > -Jay
> >
> > On Sat, Feb 10, 2018 at 3:35 PM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hi all,
> > >
> > > I have created KIP-253: Support in-order message delivery with
> partition
> > > expansion. See
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 253%3A+Support+in-order+message+delivery+with+partition+expansion
> > > .
> > >
> > > This KIP provides a way to allow messages of the same key from the same
> > > producer to be consumed in the same order they are produced even if we
> > > expand partition of the topic.
> > >
> > > Thanks,
> > > Dong
> > >
> >
>


Re: [DISCUSS] KIP-253: Support in-order message delivery with partition expansion

2018-02-22 Thread Jay Kreps
Hey Dong,

Two questions:
1. How will this work with Streams and Connect?
2. How does this compare to a solution where we physically split partitions
using a linear hashing approach (the partition number is equivalent to the
hash bucket in a hash table)? https://en.wikipedia.org/wiki/Linear_hashing

-Jay

On Sat, Feb 10, 2018 at 3:35 PM, Dong Lin  wrote:

> Hi all,
>
> I have created KIP-253: Support in-order message delivery with partition
> expansion. See
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 253%3A+Support+in-order+message+delivery+with+partition+expansion
> .
>
> This KIP provides a way to allow messages of the same key from the same
> producer to be consumed in the same order they are produced even if we
> expand partition of the topic.
>
> Thanks,
> Dong
>


Re: [DISCUSS] KIP 226 - Dynamic Broker Configuration

2017-12-18 Thread Jay Kreps
Two thoughts on implementation (shouldn't effect the KIP):

   1. It might be nice to add a parameter to ConfigDef which says whether a
   configuration is dynamically updatable or not so that we can give error
   messages if it isn't and also have it reflected in the auto-generated docs.
   2. For many systems they don't really need to take action if a config
   changes, they just need to use the new value. Changing them all to
   Reconfigurable requires managing a fair amount of mutability in each class
   that accepts changes. Some need this since they need to take actions when a
   config changes, but it seems like many just need to update some value. For
   the later you might just be able to do something like what we do for
   LogConfig where there is a single CurrentConfig instance that has a
   reference to the current KafkaConfig and always reference your configurable
   parameters via this (e.g. config.current.myConfig). Dunno if that is
   actually better, but thought I'd throw it out there.

-Jay

On Sun, Dec 10, 2017 at 8:09 AM, Rajini Sivaram 
wrote:

> Hi Jun,
>
> Thank you!
>
> 5. Yes, that makes sense. Agree that we don't want to add protocol changes
> to *UpdateMetadataRequest* in this KIP. I have moved the update of
> *log.message.format.version* and *inter.broker.protocol.version* to reduce
> restarts during upgrade to* Future Work*. We can do this in a follow-on
> KIP.
>
> I will wait for another day to see if there are any more comments and start
> vote on Tuesday if there are no other concerns.
>
>
> On Sat, Dec 9, 2017 at 12:22 AM, Jun Rao  wrote:
>
> > Hi, Rajini,
> >
> > Thanks for the reply. They all make sense.
> >
> > 5. Got it. Note that currently, only live brokers are registered in ZK.
> > Another thing is that I am not sure that we want every broker to read all
> > broker registrations directly from ZK. It's probably better to have the
> > controller propagate this information. That will require changing the
> > UpdateMetadataRequest protocol though. So, I am not sure if you want to
> do
> > that in the same KIP.
> >
> > Jun
> >
> >
> >
> > On Fri, Dec 8, 2017 at 6:07 AM, Rajini Sivaram 
> > wrote:
> >
> > > Hi Jun,
> > >
> > > Thank you for the review.
> > >
> > > 1. No, I am hoping to migrate partitions to new threads. We just need
> to
> > > ensure they don't run concurrently.
> > >
> > > 2. AdminClient has a validateOnly option for AlterConfigs. Any
> exceptions
> > > or return value of false from Reconfigurable#validate would fail the
> > > AlterConfigsRequest.
> > >
> > > 3. Yes, we will support describe and alter of configs with listener
> > prefix.
> > > We will not allow alterConfigs() of security configs without the
> listener
> > > prefix, since we need to prevent the whole cluster being made unusable.
> > >
> > > 4. Thank you, will make a note of that.
> > >
> > > 5. When we are upgrading (from 1.0 to 2.0 for example), my
> understanding
> > is
> > > that we set inter.broker.protocol.version=1.0, do a rolling upgrade of
> > the
> > > whole cluster and when all brokers are at 2.0, we do another rolling
> > > upgrade with inter.broker.protocol.version=2.0. Jason's suggestion was
> > to
> > > avoid the second rolling upgrade by enabling dynamic update of
> > > inter.broker.protocol.version. To set inter.broker.protocol.version=
> 2.0
> > > dynamically, we need to verify not just that the current broker is on
> > > version 2.0, but that all brokers int the cluster are on version 2.0 (I
> > > thought that was the reason for the second rolling upgrade). The broker
> > > version in ZK would enable that verification before performing the
> > update.
> > >
> > > 6. The config source would be STATIC_BROKER_CONFIG/DYNAMIC_
> > BROKER_CONFIG,
> > > the config name would be listener.name.listenerA.configX. And synonyms
> > > list
> > > in describeConfigs() would list  listener.name.listenerA.configX as
> well
> > > as
> > > configX.
> > >
> > > 7. I think `default` is an overused terminology already. When configs
> are
> > > described, they return a flag indicating if the value is a default. And
> > in
> > > the broker, we have so many levels of configs already and we are adding
> > so
> > > many more, that it may be better to use a different term. It doesn't
> have
> > > to be synonyms, but since we want to use the same term for topics and
> > > brokers and we have listener configs which override non-prefixed
> security
> > > configs, perhaps it is ok?
> > >
> > > Regards,
> > >
> > > Rajini
> > >
> > >
> > >
> > > On Wed, Dec 6, 2017 at 11:50 PM, Jun Rao  wrote:
> > >
> > > > A couple more things.
> > > >
> > > > 6. For the SSL/SASL configurations with the listener prefix, do we
> need
> > > > another level in config_source since it's neither topic nor broker?
> > > >
> > > > 7. For include_synonyms in DescribeConfigs, the name makes sense for
> > the
> > > > topic level configs. Not 

Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-11-22 Thread Jay Kreps
Hey Colin,

WRT memory management I think what you are saying is that you would add a
field to the fetch request which would request that the server cache the
set of partitions and the response would have a field indicating whether
that happened or not. This would allow a bound on memory.

I was also thinking there could be mechanical improvements that would help
efficiency such as sharing topic name or TopicPartition objects to reduce
the footprint in a flyweight style. If you think about it there is already
some memory overhead on a per-connection basis for socket buffers and
purgatory so a little more might be okay.

-Jay

On Wed, Nov 22, 2017 at 1:46 PM, Colin McCabe <cmcc...@apache.org> wrote:

> On Wed, Nov 22, 2017, at 13:43, Colin McCabe wrote:
> > On Wed, Nov 22, 2017, at 13:08, Jay Kreps wrote:
> > > Okay yeah, what I said didn't really work or make sense. Ismael's
> > > interpretation is better.
> > >
> > > Couple of things to point out:
> > >
> > >1. I'm less sure that replication has a high partition count and
> > >consumers don't. There are definitely use cases for consumers that
> > >subscribe to everything (e.g. load all my data into HDFS) as well as
> > >super high partition count topics. In a bigger cluster it is
> unlikely a
> > >given node is actually replicating that many partitions from another
> > >particular node (though perhaps in aggregate the effect is the
> same).
> > >I think it would clearly be desirable to have a solution that
> targeted
> > >both the consumer and replication if that were achievable.
> >
> > Hmm.  I hadn't considered the possibility that consumers might want to
> > subscribe to a huge number of topics.  That's a fair point (especially
> > with the replication example).
> >
> > >I agree with the concern on memory, but perhaps there could be a
> way to
> > >be smart about the memory usage?
> >
> > One approach would be to let clients compete for a configurable number
> > of cache slots on the broker.  So only the first N clients to ask for an
> > incremental fetch request UUID would receive one.  You could combine
> > this with making the clients not request an incremental fetch request
> > unless they were following more than some configurable number of
> > partitions (like 10).  That way you wouldn't waste all your cache slots
> > on clients that were only following 1 or 2 partitions, and hence
> > wouldn't benefit much from the optimization.
>
> By the way, I was envisioning the cache slots as something that would
> time out.  So if a client created an incremental fetch UUID and then
> disappeared, we'd eventually purge its cached offsets and let someone
> else use the memory.
>
> C.
>
> >
> > This is basically a bet on the idea that if you have clients following a
> > huge number of partitions, you probably will only have a limited number
> > of such clients.  Arguably, if you have a huge number of clients
> > following a huge number of partitions, you are going to have performance
> > problems anyway.
> >
> > >2. For the question of one request vs two, one difference in values
> > >here may be that it sounds like you are proposing a less ideal
> protocol to
> > >simplify the broker code. To me the protocol is really *the*
> > >fundamental interface in Kafka and we should really strive to make
> that
> > >something that is beautiful and makes sense on its own (without
> needing
> > >to understand the history of how we got there). I think there may
> well
> > >be such an explanation for the two API version (as you kind of said
> with
> > >your HDFS analogy) but really making it clear how these two APIs are
> > >different and how they interact is key. Like, basically I think we
> should
> > >be able to explain it from scratch in such a way that it is obvious
> you'd
> > >have these two things as the fundamental primitives for fetching
> data.
> >
> > I can see some arguments for having a single API.  One is that both
> > incremental and full fetch requests will travel along a similar code
> > path.  There will also be a lot of the same fields in both the request
> > and the response.  Separating the APIs means duplicating those fields
> > (like max_wait_time, min_bytes, isolation_level, etc.)
> >
> > The argument for having two APIs is that some fields will be be present
> > in incremental requests and not in full ones, and vice versa.  For
> > example, incremental requests wil

Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-11-22 Thread Jay Kreps
Okay yeah, what I said didn't really work or make sense. Ismael's
interpretation is better.

Couple of things to point out:

   1. I'm less sure that replication has a high partition count and
   consumers don't. There are definitely use cases for consumers that
   subscribe to everything (e.g. load all my data into HDFS) as well as super
   high partition count topics. In a bigger cluster it is unlikely a given
   node is actually replicating that many partitions from another particular
   node (though perhaps in aggregate the effect is the same). I think it would
   clearly be desirable to have a solution that targeted both the consumer and
   replication if that were achievable. I agree with the concern on memory,
   but perhaps there could be a way to be smart about the memory usage?
   2. For the question of one request vs two, one difference in values here
   may be that it sounds like you are proposing a less ideal protocol to
   simplify the broker code. To me the protocol is really *the* fundamental
   interface in Kafka and we should really strive to make that something that
   is beautiful and makes sense on its own (without needing to understand the
   history of how we got there). I think there may well be such an explanation
   for the two API version (as you kind of said with your HDFS analogy) but
   really making it clear how these two APIs are different and how they
   interact is key. Like, basically I think we should be able to explain it
   from scratch in such a way that it is obvious you'd have these two things
   as the fundamental primitives for fetching data.

-Jay

On Wed, Nov 22, 2017 at 11:02 AM, Colin McCabe <cmcc...@apache.org> wrote:

> Hi Jay,
>
> On Tue, Nov 21, 2017, at 19:03, Jay Kreps wrote:
> > I think the general thrust of this makes a ton of sense.
> >
> > I don't love that we're introducing a second type of fetch request. I
> > think the motivation is for compatibility, right? But isn't that what
> > versioning s for? Basically to me although the modification we're making
> makes
> > sense, the resulting protocol doesn't really seem like something you
> would
> > design this way from scratch.
>
> I think there are two big reasons to consider separating
> IncrementalFetchRequest from FetchRequest.
>
> As you say, the first reason is compatibility.  We will have to support
> the full FetchRequest for a long time to come because of our
> compatibility policy.  It would be good from a code quality point of
> view to avoid having widely diverging code paths for different versions
> of this request.
>
> The other reason is that conceptually I feel that there should be both
> full and incremental fetch requests.  This is similar to how HDFS has
> both incremental and full block reports.  The full reports are necessary
> when a node is restarted.  In HDFS, they also serve a periodic sanity
> check if the DataNode's view of what blocks exist has become
> desynchronized from the NameNode's view.  While in theory you could
> avoid the sanity check, in practice it often was important.
>
> Also, just to be clear, I don't think we should convert KafkaConsumer to
> using incremental fetch requests.  It seems inadvisable to allocate
> broker memory for each KafkaConsumer.  After all, there can be quite a
> few consumers, and we don't know ahead of time how many there will be.
> This is very different than brokers, where there are a small,
> more-or-less constant, number.  Also, consumers tend not to consume from
> a massive number of topics all at once, so I don't think they have the
> same problems with the existing FetchRequest RPC as followers do.
>
> >
> > I think I may be misunderstanding the semantics of the partitions in
> > IncrementalFetchRequest. I think the intention is that the server
> > remembers the partitions you last requested, and the partitions you
> specify
> > in the request are added to this set. This is a bit odd though because
> you can
> > add partitions but I don't see how you remove them, so it doesn't really
> let
> > you fully make changes incrementally. I suspect I'm misunderstanding that
> > somehow, though.
>
> Sorry, I may have done a poor job explaining the proposal.  The
> intention is that you cannot change the set of partitions you are
> receiving information about except by making a full FetchRequest.  If
> you need to make any changes to the watch set whatsoever, you must make
> a full request, not an incremental.  The idea is that changes are very
> infrequent, so we don't need to optimize this at the moment.
>
> > You'd also need to be a little bit careful that there was
> > no way for the server's idea of what the client is interested in and the
> > client's idea to ever diverge as you made th

Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-11-21 Thread Jay Kreps
I think the general thrust of this makes a ton of sense.

I don't love that we're introducing a second type of fetch request. I think
the motivation is for compatibility, right? But isn't that what versioning
is for? Basically to me although the modification we're making makes sense,
the resulting protocol doesn't really seem like something you would design
this way from scratch.

I think I may be misunderstanding the semantics of the partitions in
IncrementalFetchRequest. I think the intention is that the server remembers
the partitions you last requested, and the partitions you specify in the
request are added to this set. This is a bit odd though because you can add
partitions but I don't see how you remove them, so it doesn't really let
you fully make changes incrementally. I suspect I'm misunderstanding that
somehow, though. You'd also need to be a little bit careful that there was
no way for the server's idea of what the client is interested in and the
client's idea to ever diverge as you made these modifications over time
(due to bugs or whatever).

It seems like an alternative would be to not add a second request, but
instead change the fetch api and implementation

   1. We save the partitions you last fetched on that connection in the
   session for the connection (as I think you are proposing)
   2. It only gives you back info on partitions that have data or have
   changed (no reason you need the others, right?)
   3. Not specifying any partitions means "give me the usual", as defined
   by whatever you requested before attached to the session.

This would be a new version of the fetch API, so compatibility would be
retained by retaining the older version as is.

This seems conceptually simpler to me. It's true that you have to resend
the full set whenever you want to change it, but that actually seems less
error prone and that should be rare.

I suspect you guys thought about this and it doesn't quite work, but maybe
you could explain why?

-Jay

On Tue, Nov 21, 2017 at 1:02 PM, Colin McCabe  wrote:

> Hi all,
>
> I created a KIP to improve the scalability and latency of FetchRequest:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 227%3A+Introduce+Incremental+FetchRequests+to+Increase+
> Partition+Scalability
>
> Please take a look.
>
> cheers,
> Colin
>


[jira] [Resolved] (KAFKA-414) Evaluate mmap-based writes for Log implementation

2017-11-16 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-414.
-
Resolution: Won't Fix

> Evaluate mmap-based writes for Log implementation
> -
>
> Key: KAFKA-414
> URL: https://issues.apache.org/jira/browse/KAFKA-414
> Project: Kafka
>  Issue Type: New Feature
>    Reporter: Jay Kreps
>Priority: Minor
> Attachments: TestLinearWritePerformance.java, 
> linear_write_performance.txt
>
>
> Working on another project I noticed that small write performance for 
> FileChannel is really very bad. This likely effects Kafka in the case where 
> messages are produced one at a time or in small batches. I wrote a quick 
> program to evaluate the following options:
> raf = RandomAccessFile
> mmap = MappedByteBuffer
> channel = FileChannel
> For both of the later two I tried both direct-allocated and non-direct 
> allocated buffers (direct allocation is supposed to be faster).
> Here are the results I saw:
> [jkreps@jkreps-ld valencia]$ java -XX:+UseConcMarkSweepGC -cp 
> target/test-classes -server -Xmx1G -Xms1G valencia.TestLinearWritePerformance 
> $((256*1024)) $((1*1024*1024*1024)) 2
>   file_length  size (bytes)  raf (mb/sec) 
>   channel_direct (mb/sec)  mmap_direct (mb/sec) channel_heap (mb/sec) 
>mmap_heap (mb/sec)
>   100 1   
> 0.60  0.52 28.66  
> 0.55 50.40
>   200 2   
> 1.18  1.16 67.84  
> 1.13 74.17
>   400 4   
> 2.33  2.26121.52  
> 2.23122.14
>   800 8   
> 4.72  4.51228.39  
> 4.41175.20
>  160016   
> 9.25  8.96393.24  
> 8.88314.11
>  320032  
> 18.43 17.93601.83 
> 17.28482.25
>  640064  
> 36.25 35.21799.98 
> 34.39680.39
> 12800   128  
> 69.80 67.52963.30 
> 66.21870.82
> 25600   256 
> 134.24129.25   1064.13
> 129.01   1014.00
> 51200   512 
> 247.38238.24   1124.71
> 235.57   1091.81
>102400  1024 
> 420.42411.43   1170.94
> 406.57   1138.80
>1073741824  2048 
> 671.93658.96   1133.63
> 650.39   1151.81
>1073741824  4096
> 1007.84989.88   1165.60   
>  976.10   1158.49
>1073741824  8192
> 1137.12   1145.01   1189.38   
> 1128.30   1174.66
>1073741824 16384
> 1172.63   1228.33   1192.19   
> 1206.58   1156.37
>1073741824 32768
> 1221.13   1295.37   1170.96   
> 1262.28   1156.65
>1073741824 65536
> 1255.23   1306.33   1160.22   
> 1268.24 

Re: [DISCUSS] URIs on Producer and Consumer

2017-10-05 Thread Jay Kreps
Hey Clebert,

Is there a motivation for adding a second way? We generally try to avoid
having two ways to do something unless it's really needed...I suspect you
have a reason for wanting this, though.

-Jay

On Mon, Oct 2, 2017 at 6:15 AM Clebert Suconic 
wrote:

> At ActiveMQ and ActiveMQ Artemis, ConnectionFactories have an
> interesting feature where you can pass parameters through an URI.
>
> I was looking at Producer and Consumer APIs, and these two classes are
> using a method that I considered old for Artemis resembling HornetQ:
>
> Instead of passing a Properties (aka HashMaps), users would be able to
> create a Consumer or Producer by simply doing:
>
> new Consumer("tcp::/host:port?properties=values;properties=values...etc");
>
> Example:
>
>
> Instead of the following:
>
> Map config = new HashMap<>();
> config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:");
> config.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, -2);
> new KafkaConsumer<>(config, new ByteArrayDeserializer(), new
> ByteArrayDeserializer());
>
>
>
> Someone could do
>
> new KafkaConsumer<>("tcp://localhost:?receive.buffer.bytes=-2",
> new ByteArrayDeserializer(), new ByteArrayDeserializer());
>
>
>
> I don't know if that little API improvement would be welcomed? I would be
> able to send a Pull Request but I don't want to do it if that wouldn't
> be welcomed in the first place:
>
>
> Just an idea...  let me know if that is welcomed or not.
>
> If so I can forward the discussion into how I would implement it.
>


Re: How is CorrelationId used for matching request and response

2017-09-30 Thread Jay Kreps
Yes the idea of the correlation id is to make it easier for the client to
match a particular response to the request it answers. Kafka’s protocol
allows sending multiple requests without waiting for the response. In
theory you can just rely on ordering, but that can be a bit fragile if the
client has any kind of bug. So this id is an explicit check—a response with
id 42 is the answer to the request you sent with id 42. Hope that helps!

-Jay

On Fri, Sep 29, 2017 at 4:52 PM Ted Yu  wrote:

> Which release / version are you looking at ?
> In trunk branch, I only see one toSend():
>
> protected Send toSend(String destination, ResponseHeader header, short
> apiVersion) {
>
> return new NetworkSend(destination, serialize(apiVersion, header));
>
> On Fri, Sep 29, 2017 at 4:49 PM, Javed, Haseeb <
> javed...@buckeyemail.osu.edu
> > wrote:
>
> > The Kafka protocol guide mentions that each request and response contains
> > a correlationId which is a user-supplied integer to match requests and
> > corresponding responses. However, when I look at the code in the class
> > AbstractResponse, we have a method defined as following:
> >
> >
> > public Send toSend(String destination, RequestHeader requestHeader) {
> > return toSend(destination, requestHeader.apiVersion(), requestHeader.
> > toResponseHeader());
> > }
> >
> > So basically we are just using the requestHeader to generate the
> > responseHeader so doesn't this pretty much guarantees that the
> > correlationId for the Request and the Response would always be the same,
> or
> > am I missing something?
> >
> >
> >
>


Re: [DISCUSS] KIP-185: Make exactly once in order delivery per partition the default producer setting

2017-08-15 Thread Jay Kreps
Hey Guozhang,

I think the argument is that with acks=1 the message could be lost and
hence you aren't guaranteeing exactly once delivery.

-Jay

On Mon, Aug 14, 2017 at 1:36 PM, Guozhang Wang  wrote:

> Just want to clarify that regarding 1), I'm fine with changing it to `all`
> but just wanted to argue it is not necessarily correlate with the
> exactly-once semantics, but rather on persistence v.s. availability
> trade-offs, so I'd like to discuss them separately.
>
> Regarding 2), one minor concern I had is that the enforcement is on the
> client side while the parts it affects is on the broker side. I.e. the
> broker code would assume at most 5 in.flight when idempotent is turned on,
> but this is not enforced at the broker but relying at the client side's
> sanity. So other implementations of the client that may not obey this may
> likely break the broker code. If we do enforce this we'd better enforce it
> at the broker side. Also, I'm wondering if we have considered the approach
> for brokers to read the logs in order to get the starting offset when it
> does not about it in its snapshot, that whether it is worthwhile if we
> assume that such issues are very rare to happen?
>
>
> Guozhang
>
>
>
> On Mon, Aug 14, 2017 at 11:01 AM, Apurva Mehta 
> wrote:
>
> > Hello,
> >
> > I just want to summarize where we are in this discussion
> >
> > There are two major points of contention: should we have acks=1 or
> acsk=all
> > by default? and how to cap max.in.flight.requests.per.connection?
> >
> > 1) acks=1 vs acks=all1
> >
> > Here are the tradeoffs of each:
> >
> > If you have replication-factor=N, your data is resilient N-1 to disk
> > failures. For N>1, here is the tradeoff between acks=1 and acks=all.
> >
> > With proposed defaults and acks=all, the stock Kafka producer and the
> > default broker settings would guarantee that ack'd messages would be in
> the
> > log exactly once.
> >
> > With the proposed defaults and acks=1, the stock Kafka producer and the
> > default broker settings would guarantee that 'retained ack'd messages
> would
> > be in the log exactly once. But all ack'd messages may not be retained'.
> >
> > If you leave replication-factor=1, acks=1 and acks=all have identical
> > semantics and performance, but you are resilient to 0 disk failures.
> >
> > I think the measured cost (again the performance details are in the wiki)
> > of acks=all is well worth the much clearer semantics. What does the rest
> of
> > the community think?
> >
> > 2) capping max.in.flight at 5 when idempotence is enabled.
> >
> > We need to limit the max.in.flight for the broker to de-duplicate
> messages
> > properly. The limitation would only apply when idempotence is enabled.
> The
> > shared numbers show that when the client-broker latency is low, there is
> no
> > performance gain for max.inflight > 2.
> >
> > Further, it is highly debatable that max.in.flight=500 is significantly
> > better than max.in.flight=5  for a really high latency client-broker
> link,
> > and so far there are no hard numbers one way or another. However,
> assuming
> > that max.in.flight=500 is significantly better than max.inflight=5 in
> some
> > niche use case, the user would have to sacrifice idempotence for
> > throughput. In this extreme corner case, I think it is an acceptable
> > tradeoff.
> >
> > What does the community think?
> >
> > Thanks,
> > Apurva
> >
>
>
>
> --
> -- Guozhang
>


Re: [DISCUSS] KIP-185: Make exactly once in order delivery per partition the default producer setting

2017-08-12 Thread Jay Kreps
Becket,

I think this proposal actually does a great deal to address the
configuration complexity. It is true that there are a number of knobs, but
the result of this change is that 99% of people don't need to think about
them (and the mechanism we have to communicate that is to reduce the
importance setting that translates to the docs so people know these are low
level tuning things). Instead we can just focus on trying to make things
safe and fast by default with the full guarantees. Very extreme use cases
may require giving up some of the safety guarantees but I think that's
okay, those people won't necessarily want to change all the configs
together, they'll want to change just the acks setting most likely.

-Jay




On Fri, Aug 11, 2017 at 5:39 PM, Becket Qin  wrote:

> BTW, I feel that the configurations we have around those guarantees have
> become too complicated for the users. Not sure if this is considered before
> but Maybe we can have some helper functions provided to the users. For
> example:
>
> Properties TopicConfig.forSemantc(Semantic semantic);
> Properties ProducerConfig.forSemantc(Semantic semantic);
>
> Where the semantics are "AT_MOST_ONCE, AT_LEAST_ONCE, EXACTLY_ONCE". So
> users could just pick the one they want. This would be as if we have more
> than one default config sets.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Aug 11, 2017 at 5:26 PM, Becket Qin  wrote:
>
> > Hi Apurva,
> >
> > Thanks for the reply. When I was thinking of exactly once I am thinking
> of
> > "exactly once with availability", Users probably wouldn't want to
> sacrifice
> > availability for exactly once. To achieve exactly once with same
> > availability and acks=all, users actually need to pay more cost. To
> > tolerate one broker failure, one has to set replication.factor to at
> least
> > 3 and min.isr to at least 2. Do you mean we should also set those to
> > default value? Would it be a little weird because redundancy level is a
> > pretty customized decision so there is no one single correct default
> > configuration for that.
> >
> > The concern I have is that acks=-1 is not only associated with exactly
> > once semantic. I am not sure if the side effect it brings justifies a
> > default config, such as performance, cost, etc.
> >
> > From users' perspective, when idempotence=true and
> > max.in.flight.requests.per.connection > 0, ideally what acks=1 should
> > really mean is that "as long as there is no hardware failure, my message
> is
> > sent exactly once". Do you think this semantic is good enough as a
> default
> > configuration to ship? It is unfortunate this statement is not true today
> > as even when we do leader migration without any broker failure, the
> leader
> > will naively truncate the data that has not been replicated. It is a long
> > existing issue and we should try to fix that.
> >
> > For the max.in.flight.requests.per.connection, can you elaborate a
> little
> > on "Given the nature of the idempotence feature, we have to bound it.".
> > What is the concern here? It seems that when nothing wrong happens,
> > pipelining should just work. And the memory is bounded by the memory
> buffer
> > pool anyways. Sure one has to resend all the subsequent batches if one
> > batch is out of sequence, but that should be rare and we probably should
> > not optimize for that.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Fri, Aug 11, 2017 at 2:08 PM, Apurva Mehta 
> wrote:
> >
> >> Thanks for your email Becket. I would be interested in hearing others
> >> opinions on which should be a better default between acks=1 and
> acks=all.
> >>
> >> One important point on which I disagree is your statement that 'users
> need
> >> to do a lot of work to get exactly-once with acks=all'. This is
> debatable.
> >> If we enable acks=all,  and if we ship with sane topic-level configs
> (like
> >> disabling unclean leader election), then users will get produce
> exceptions
> >> with the default settings only for authorization and config exceptions,
> or
> >> exceptions due to correlated hard failures or software bugs (assuming
> >> replication-factor > 1, which is when acks=all and acks=1 differ). This
> >> should be sufficiently rare that expecting apps to shut down and have
> >> manual intervention to ensure data consistency is not unreasonable.
> >>
> >> So users will not have to have complicated code to ensure exactly-once
> in
> >> their app with my proposed defaults: just shut down the producer when a
> >> `send` returns an error and check manually if you really care about
> >> exactly-once. The latter should happen so rarely that I argue that it
> >> would
> >> be worth the cost. And if all else fails, there are still ways to
> recover
> >> automatically, but those are then very complex as you pointed out.
> >>
> >> Regarding max.in.flight: again, given the nature of the idempotence
> >> feature, we have to bound it. One 

Re: [DISCUSS] KIP-185: Make exactly once in order delivery per partition the default producer setting

2017-08-12 Thread Jay Kreps
+1

I think there is some pain with changing any default, but this is well
worth it.

The reality is that the profile of Kafka usage has changed significantly
since replication was added to Kafka and these defaults were chosen. At
that time usage was primarily high volume event and log data and the
performance of acks=all was quite bad. Both of those two things have
changed. Now I think we see far more critical production uses, and the cost
of the better semantics is really pretty minimal.

Kafka has very strong guarantees now to support that type of use case but
you really have to understand the knobs and go in and set them correctly.

A good example of this is Gwen's talk on tuning Kafka for reliability.
Basically that 45 minute talk is the least amount you need to know to not
risk your data. Needless to say we see people goofing things up and being
unhappy about it.

The reality is that our performance in the "safe" mode is actually quite
spectacular. Back of the envelope it looks like about 570k msg/sec for 128
byte messages for acks=all vs 680k msgs/sec for acks=1. I suspect with
tuning we could shrink that, but even as is I think the number of apps that
need throughput north of 500k msgs/sec/instance is small.

The only thing I'd add is that let's make sure the out-of-the box usage on
your own machine isn't complicated (i.e. not introduce any setting that has
to be adjusted to run in single node configuration) as a lot of people
depend on that.

-Jay

On Tue, Aug 8, 2017 at 5:23 PM Apurva Mehta  wrote:

> Hi,
>
> I've put together a new KIP which proposes to ship Kafka with its strongest
> delivery guarantees by default.
>
> We currently ship with at most once semantics and don't provide any
> ordering guarantees per partition. The proposal is is to provide exactly
> once in order delivery per partition by default in the upcoming 1.0.0
> release.
>
> The KIP linked to below also outlines the performance characteristics of
> the proposed default.
>
> The KIP is here:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 185%3A+Make+exactly+once+in+order+delivery+per+partition+
> the+default+producer+setting
>
> Please have a look, I would love your feedback!
>
> Thanks,
> Apurva
>


Re: [DISCUSS] 2017 October release planning and release version

2017-07-21 Thread Jay Kreps
1.0! Let's do it!

-Jay

On Tue, Jul 18, 2017 at 3:36 PM, Guozhang Wang  wrote:

> Hi all,
>
> With 0.11.0.0 out of the way, I would like to volunteer to be the
> release manager
> for our next time-based feature release. See https://cwiki.apache.org/
> confluence/display/KAFKA/Time+Based+Release+Plan if you missed
> previous communication
> on time-based releases or need a reminder.
>
> I put together a draft release plan with October 2017 as the release month
> (as previously agreed) and a list of KIPs that have already been voted:
>
> https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+2017.Oct
> As of today we already have 10 KIPs voted, including 2 merged and 3 with
> PRs under review. As we start the process more KIPs are expected to be
> added until the KIP freeze date.
>
> In addition to the current release plan, I would also like to propose to
> set the release version to 1.0.0. More specifically, we will bump up the
> major version from 0.11 to 1.0, and change the semantics of release digits
> as:
>
> major.minor.bugfix[.release-candidate]
>
> To be better aligned with software versioning (https://en.wikipedia.org/
> wiki/Software_versioning). Moving forward we can use three digits instead
> of four in most places that do not require to indicate the rc number. Here
> is my motivation:
>
> 1) Kafka has significantly evolved from its first Apache release of 0.7.0
> (incubating) as a pub-sub messaging system into a distributed streaming
> platform that can enable publish / store / process real-time data streams,
> with the addition of replication (0.8.0), quota / security support for
> multi-tenancy (0.8.2, 0.9.0), Connect and Streams API (0.9.0, 0.10.0), and
> most recently the exactly-once support to have the desired
> semantics (0.11.0); I think now is a good time to mark the release as a
> major milestone in the evolution of Apache Kafka.
>
> 2) Some people believe 0.x means that the software is immature or not
> stable, or the public APIs may subject to change incompatibly regardless
> the fact that Kafka has been widely adopted in productions and the
> community has made a great effort on maintaining backward compatibility.
> Making Kafka 1.x will help with promoting the project for that perception.
>
> 3) Having three digits as of "major.minor.bugfix" is more natural from a
> software version understanding pov and aligned with other open source
> projects as well.
>
> How do people feel about 1.0.0.x as the next Kafka version? Please share
> your thoughts.
>
> -- Guozhang
>


Re: Consumer throughput drop

2017-07-20 Thread Jay Kreps
I suspect this is on Linux right?

The way Linux works is it uses a percent of memory to buffer new writes, at
a certain point it thinks it has too much buffered data and it gives high
priority to writing that out. The good news about this is that the writes
are very linear, well layed out, and high-throughput. The problem is that
it leads to a bit of see-saw behavior.

Now obviously the drop in performance isn't wrong. When your disk is
writing data out it is doing work and obviously the read throughput will be
higher when you are just reading and not writing then when you are doing
both reading and writing simultaneously. So obviously you can't get the
no-writing performance when you are also writing (unless you add I/O
capacity).

But still these big see-saws in performance are not ideal. You'd rather
have more constant performance all the time rather than have linux bounce
back and forth from writing nothing and then frantically writing full bore.
Fortunately linux provides a set of pagecache tuning parameters that let
you control this a bit.

I think these docs cover some of the parameters:
https://access.redhat.com/documentation/en-US/Red_Hat_Enterprise_Linux/6/html/Performance_Tuning_Guide/s-memory-tunables.html

-Jay

On Thu, Jul 20, 2017 at 10:24 AM, Ovidiu-Cristian MARCU <
ovidiu-cristian.ma...@inria.fr> wrote:

> Hi guys,
>
> I’m relatively new to Kafka’s world. I have an issue I describe below,
> maybe you can help me understand this behaviour.
>
> I’m running a benchmark using the following setup: one producer sends data
> to a topic and concurrently one consumer pulls and writes it to another
> topic.
> Measuring the consumer throughput, I observe values around 500K records/s
> only until the system’s cache gets filled - from this moment the consumer
> throughout drops to ~200K (2.5 times lower).
> Looking at disk usage, I observe disk read I/O which corresponds to the
> moment the consumer throughout drops.
> After some time, I kill the producer and immediately I observe the
> consumer throughput goes up to initial values ~ 500K records/s.
>
> What can I do to avoid this throughput drop?
>
> Attached an image showing disk I/O and CPU usage. I have about 128GB RAM
> on that server which gets filled at time ~2300.
>
> Thanks,
> Ovidiu
>
>


Re: KIP-162: Enable topic deletion by default

2017-07-19 Thread Jay Kreps
+1

On Sat, May 27, 2017 at 11:04 AM, Gwen Shapira  wrote:

> Thanks Vahid,
>
> Do you mind if we leave the command-line out of scope for this?
>
> I can see why adding confirmations, options to bypass confirmations, etc
> would be an improvement. However, I've seen no complaints about the current
> behavior of the command-line and the KIP doesn't change it at all. So I'd
> rather address things separately.
>
> Gwen
>
> On Fri, May 26, 2017 at 8:10 PM Vahid S Hashemian <
> vahidhashem...@us.ibm.com>
> wrote:
>
> > Gwen, thanks for the KIP.
> > It looks good to me.
> >
> > Just a minor suggestion: It would be great if the command asks for a
> > confirmation (y/n) before deleting the topic (similar to how removing
> ACLs
> > works).
> >
> > Thanks.
> > --Vahid
> >
> >
> >
> > From:   Gwen Shapira 
> > To: "dev@kafka.apache.org" , Users
> > 
> > Date:   05/26/2017 07:04 AM
> > Subject:KIP-162: Enable topic deletion by default
> >
> >
> >
> > Hi Kafka developers, users and friends,
> >
> > I've added a KIP to improve our out-of-the-box usability a bit:
> > KIP-162: Enable topic deletion by default:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 162+-+Enable+topic+deletion+by+default
> >
> >
> > Pretty simple :) Discussion and feedback are welcome.
> >
> > Gwen
> >
> >
> >
> >
> >
>


Re: Kafka Streams vs Spark Streaming : reduce by window

2017-06-16 Thread Jay Kreps
I think the question is when do you actually *want* processing time
semantics? There are definitely times when its safe to assume the two are
close enough that a little lossiness doesn't matter much but it is pretty
hard to make assumptions about when the processing time is and has been
hard for us to think of a use case where its actually desirable.

I think mostly what we've seen is confusion about the core concepts:

   - stream -- immutable events that occur
   - tables (including windows) -- current state of the world

If the root problem is confusion adding knobs never makes it better. If the
root problem is we're missing important use cases that justify the
additional knobs then i think it's good to try to really understand them. I
think there could be use cases around systems that don't take updates,
example would be email, twitter, and some metrics stores.

One solution that would be less complexity inducing than allowing new
semantics, but might help with the use cases we need to collect, would be
to add a new operator in the DSL. Something like .freezeAfter(30,
TimeUnit.SECONDS) that collects all updates for a given window and both
emits and enforces a single output after 30 seconds after the advancement
of stream time and remembers that it is omitted, suppressing all further
output (so the output is actually a KStream). This might or might not
depend on wall clock time. Perhaps this is in fact what you are proposing?

-Jay



On Fri, Jun 16, 2017 at 2:38 AM, Michal Borowiecki <
michal.borowie...@openbet.com> wrote:

> I wonder if it's a frequent enough use case that Kafka Streams should
> consider providing this out of the box - this was asked for multiple times,
> right?
>
> Personally, I agree totally with the philosophy of "no final aggregation",
> as expressed by Eno's post, but IMO that is predicated totally on
> event-time semantics.
>
> If users want processing-time semantics then, as the docs already point
> out, there is no such thing as a late-arriving record - every record just
> falls in the currently open window(s), hence the notion of final
> aggregation makes perfect sense, from the usability point of view.
>
> The single abstraction of "stream time" proves leaky in some cases (e.g.
> for punctuate method - being addressed in KIP-138). Perhaps this is another
> case where processing-time semantics warrant explicit handling in the api -
> but of course, only if there's sufficient user demand for this.
>
> What I could imagine is a new type of time window (ProcessingTimeWindow?),
> that if used in an aggregation, the underlying processor would force the
> WallclockTimestampExtractor (KAFKA-4144 enables that) and would use the
> system-time punctuation (KIP-138) to send the final aggregation value once
> the window has expired and could be configured to not send intermediate
> updates while the window was open.
>
> Of course this is just a helper for the users, since they can implement it
> all themselves using the low-level API, as Matthias pointed out already.
> Just seems there's recurring interest in this.
>
> Again, this only makes sense for processing time semantics. For event-time
> semantics I find the arguments for "no final aggregation" totally
> convincing.
>
>
> Cheers,
>
> Michał
>
> On 16/06/17 00:08, Matthias J. Sax wrote:
>
> Hi Paolo,
>
> This SO question might help, 
> too:https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable
>
> For Streams, the basic model is based on "change" and we report updates
> to the "current" result immediately reducing latency to a minimum.
>
> Last, if you say it's going to fall into the next window, you won't get
> event time semantics but you fall back processing time semantics, that
> cannot provide exact results
>
> If you really want to trade-off correctness version getting (late)
> updates and want to use processing time semantics, you should configure
> WallclockTimestampExtractor and implement a "update deduplication"
> operator using table.toStream().transform(). You can attached a state to
> your transformer and store all update there (ie, newer update overwrite
> older updates). Punctuations allow you to emit "final" results for
> windows for which "window end time" passed.
>
>
> -Matthias
>
> On 6/15/17 9:21 AM, Paolo Patierno wrote:
>
> Hi Eno,
>
>
> regarding closing window I think that it's up to the streaming application. I 
> mean ...
>
> If I want something like I described, I know that a value outside my 5 
> seconds window will be taken into account for the next processing (in the 
> next 5 seconds). I don't think I'm losing a record, I am ware that this 
> record will fall in the next "processing" window. Btw I'll take a look at 
> your article ! Thanks !
>
>
> Paolo
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : 

Re: Kafka Streams vs Spark Streaming : reduce by window

2017-06-16 Thread Jay Kreps
I think the question is when do you actually *want* processing time
semantics? There are definitely times when its safe to assume the two are
close enough that a little lossiness doesn't matter much but it is pretty
hard to make assumptions about when the processing time is and has been
hard for us to think of a use case where its actually desirable.

I think mostly what we've seen is confusion about the core concepts:

   - stream -- immutable events that occur
   - tables (including windows) -- current state of the world

If the root problem is confusion adding knobs never makes it better. If the
root problem is we're missing important use cases that justify the
additional knobs then i think it's good to try to really understand them. I
think there could be use cases around systems that don't take updates,
example would be email, twitter, and some metrics stores.

One solution that would be less complexity inducing than allowing new
semantics, but might help with the use cases we need to collect, would be
to add a new operator in the DSL. Something like .freezeAfter(30,
TimeUnit.SECONDS) that collects all updates for a given window and both
emits and enforces a single output after 30 seconds after the advancement
of stream time and remembers that it is omitted, suppressing all further
output (so the output is actually a KStream). This might or might not
depend on wall clock time. Perhaps this is in fact what you are proposing?

-Jay



On Fri, Jun 16, 2017 at 2:38 AM, Michal Borowiecki <
michal.borowie...@openbet.com> wrote:

> I wonder if it's a frequent enough use case that Kafka Streams should
> consider providing this out of the box - this was asked for multiple times,
> right?
>
> Personally, I agree totally with the philosophy of "no final aggregation",
> as expressed by Eno's post, but IMO that is predicated totally on
> event-time semantics.
>
> If users want processing-time semantics then, as the docs already point
> out, there is no such thing as a late-arriving record - every record just
> falls in the currently open window(s), hence the notion of final
> aggregation makes perfect sense, from the usability point of view.
>
> The single abstraction of "stream time" proves leaky in some cases (e.g.
> for punctuate method - being addressed in KIP-138). Perhaps this is another
> case where processing-time semantics warrant explicit handling in the api -
> but of course, only if there's sufficient user demand for this.
>
> What I could imagine is a new type of time window (ProcessingTimeWindow?),
> that if used in an aggregation, the underlying processor would force the
> WallclockTimestampExtractor (KAFKA-4144 enables that) and would use the
> system-time punctuation (KIP-138) to send the final aggregation value once
> the window has expired and could be configured to not send intermediate
> updates while the window was open.
>
> Of course this is just a helper for the users, since they can implement it
> all themselves using the low-level API, as Matthias pointed out already.
> Just seems there's recurring interest in this.
>
> Again, this only makes sense for processing time semantics. For event-time
> semantics I find the arguments for "no final aggregation" totally
> convincing.
>
>
> Cheers,
>
> Michał
>
> On 16/06/17 00:08, Matthias J. Sax wrote:
>
> Hi Paolo,
>
> This SO question might help, 
> too:https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable
>
> For Streams, the basic model is based on "change" and we report updates
> to the "current" result immediately reducing latency to a minimum.
>
> Last, if you say it's going to fall into the next window, you won't get
> event time semantics but you fall back processing time semantics, that
> cannot provide exact results
>
> If you really want to trade-off correctness version getting (late)
> updates and want to use processing time semantics, you should configure
> WallclockTimestampExtractor and implement a "update deduplication"
> operator using table.toStream().transform(). You can attached a state to
> your transformer and store all update there (ie, newer update overwrite
> older updates). Punctuations allow you to emit "final" results for
> windows for which "window end time" passed.
>
>
> -Matthias
>
> On 6/15/17 9:21 AM, Paolo Patierno wrote:
>
> Hi Eno,
>
>
> regarding closing window I think that it's up to the streaming application. I 
> mean ...
>
> If I want something like I described, I know that a value outside my 5 
> seconds window will be taken into account for the next processing (in the 
> next 5 seconds). I don't think I'm losing a record, I am ware that this 
> record will fall in the next "processing" window. Btw I'll take a look at 
> your article ! Thanks !
>
>
> Paolo
>
>
> Paolo Patierno
> Senior Software Engineer (IoT) @ Red Hat
> Microsoft MVP on Windows Embedded & IoT
> Microsoft Azure Advisor
>
> Twitter : 

Re: [Vote] KIP-150 - Kafka-Streams Cogroup

2017-06-09 Thread Jay Kreps
+1

-Jay

On Thu, Jun 8, 2017 at 11:16 AM, Guozhang Wang  wrote:

> I think we can continue on this voting thread.
>
> Currently we have one binding vote and 2 non-binging votes. I would like to
> call out for other people especially committers to also take a look at this
> proposal and vote.
>
>
> Guozhang
>
>
> On Wed, Jun 7, 2017 at 6:37 PM, Kyle Winkelman 
> wrote:
>
> > Just bringing people's attention to the vote thread for my KIP. I started
> > it before another round of discussion happened. Not sure the protocol so
> > someone let me know if I am supposed to restart the vote.
> > Thanks,
> > Kyle
> >
> > On May 24, 2017 8:49 AM, "Bill Bejeck"  wrote:
> >
> > > +1  for the KIP and +1 what Xavier said as well.
> > >
> > > On Wed, May 24, 2017 at 3:57 AM, Damian Guy 
> > wrote:
> > >
> > > > Also, +1 for the KIP
> > > >
> > > > On Wed, 24 May 2017 at 08:57 Damian Guy 
> wrote:
> > > >
> > > > > +1 to what Xavier said
> > > > >
> > > > > On Wed, 24 May 2017 at 06:45 Xavier Léauté 
> > > wrote:
> > > > >
> > > > >> I don't think we should wait for entries from each stream, since
> > that
> > > > >> might
> > > > >> limit the usefulness of the cogroup operator. There are instances
> > > where
> > > > it
> > > > >> can be useful to compute something based on data from one or more
> > > > stream,
> > > > >> without having to wait for all the streams to produce something
> for
> > > the
> > > > >> group. In the example I gave in the discussion, it is possible to
> > > > compute
> > > > >> impression/auction statistics without having to wait for click
> data,
> > > > which
> > > > >> can typically arrive several minutes late.
> > > > >>
> > > > >> We could have a separate discussion around adding inner / outer
> > > > modifiers
> > > > >> to each of the streams to decide which fields are optional /
> > required
> > > > >> before sending updates if we think that might be useful.
> > > > >>
> > > > >>
> > > > >>
> > > > >> On Tue, May 23, 2017 at 6:28 PM Guozhang Wang  >
> > > > wrote:
> > > > >>
> > > > >> > The proposal LGTM, +1
> > > > >> >
> > > > >> > One question I have is about when to send the record to the
> > resulted
> > > > >> KTable
> > > > >> > changelog. For example in your code snippet in the wiki page,
> > before
> > > > you
> > > > >> > see the end result of
> > > > >> >
> > > > >> > 1L, Customer[
> > > > >> >
> > > > >> >   cart:{Item[no:01], Item[no:03],
> > Item[no:04]},
> > > > >> >   purchases:{Item[no:07], Item[no:08]},
> > > > >> >   wishList:{Item[no:11]}
> > > > >> >   ]
> > > > >> >
> > > > >> >
> > > > >> > You will firs see
> > > > >> >
> > > > >> > 1L, Customer[
> > > > >> >
> > > > >> >   cart:{Item[no:01]},
> > > > >> >   purchases:{},
> > > > >> >   wishList:{}
> > > > >> >   ]
> > > > >> >
> > > > >> > 1L, Customer[
> > > > >> >
> > > > >> >   cart:{Item[no:01]},
> > > > >> >   purchases:{Item[no:07],Item[no:08]},
> > > > >> >
> > > > >> >   wishList:{}
> > > > >> >   ]
> > > > >> >
> > > > >> > 1L, Customer[
> > > > >> >
> > > > >> >   cart:{Item[no:01]},
> > > > >> >   purchases:{Item[no:07],Item[no:08]},
> > > > >> >
> > > > >> >   wishList:{}
> > > > >> >   ]
> > > > >> >
> > > > >> > ...
> > > > >> >
> > > > >> >
> > > > >> > I'm wondering if it makes more sense to only start sending the
> > > update
> > > > if
> > > > >> > the corresponding agg-key has seen at least one input from each
> of
> > > the
> > > > >> > input stream? Maybe it is out of the scope of this KIP and we
> can
> > > make
> > > > >> it a
> > > > >> > more general discussion in a separate one.
> > > > >> >
> > > > >> >
> > > > >> > Guozhang
> > > > >> >
> > > > >> >
> > > > >> > On Fri, May 19, 2017 at 8:37 AM, Xavier Léauté <
> > xav...@confluent.io
> > > >
> > > > >> > wrote:
> > > > >> >
> > > > >> > > Hi Kyle, I left a few more comments in the discussion thread,
> if
> > > you
> > > > >> > > wouldn't mind taking a look
> > > > >> > >
> > > > >> > > On Fri, May 19, 2017 at 5:31 AM Kyle Winkelman <
> > > > >> winkelman.k...@gmail.com
> > > > >> > >
> > > > >> > > wrote:
> > > > >> > >
> > > > >> > > > Hello all,
> > > > >> > > >
> > > > >> > > > I would like to start the vote on KIP-150.
> > > > >> > > >
> > > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 150+-+
> > > > >> > > Kafka-Streams+Cogroup
> > > > >> > > >
> > > > >> > > > Thanks,
> > > > >> > > > Kyle
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> > --
> > > > >> > -- Guozhang
> > > > >> >
> > > > >>
> > > > >
> > > >
> > >
> >
>
>
>
> --
> -- Guozhang
>


[jira] [Comment Edited] (KAFKA-1955) Explore disk-based buffering in new Kafka Producer

2017-06-08 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16043494#comment-16043494
 ] 

Jay Kreps edited comment on KAFKA-1955 at 6/8/17 9:43 PM:
--

I think the patch I submitted was kind of a cool hack, but after thinking about 
it I wasn't convinced it was really what you actually want.

Here are the considerations I thought we should probably think through:
1. How will recovery work? The patch I gave didn't have a mechanism to recover 
from a failure. I don't think this is really good enough. It means that it is 
okay if Kafka goes down, or if the app goes down, but not both. This helps but 
seems like not really what you want. But to properly handle app failure isn't 
that easy. For example, in the case of a OS crash the OS gives very weak 
guarantees on what is on disk for any data that hasn't been fsync'd. Not only 
can arbitrary bits of data be missing but it is even possible with some FS 
configurations to get arbitrary corrupt blocks that haven't been zero'd yet. I 
think to get this right you need a commit log and recovery procedure that 
verifies unsync'd data on startup. I'm not 100% sure you can do this with just 
the buffer pool, though maybe you can.
2. What are the ordering guarantees for buffered data?
3. How does this interact with transactions/EOS?
4. Should it be the case that all writes go through the commit log or should it 
be the case that only failures are journaled. If you journal all writes prior 
to sending to the server, the problem is that that amounts to significant 
overhead and leads to the possibility that logging or other I/O can slow you 
down. If you journal only failures you have the problem that your throughput 
may be very high in the non-failure scenario and then when Kafka goes down 
suddenly you start doing I/O but that is much slower and your throughput drops 
precipitously. Either may be okay but it is worth thinking through what the 
right behavior is.


was (Author: jkreps):
I think the patch I submitted was kind of a cool hack, but after thinking about 
it I wasn't convinced it was really what you actually want.

Here are the considerations I thought we should probably think through:
1. How will recovery work? The patch I gave didn't have a mechanism to recover 
from a failure. In the case of a OS crash the OS gives very weak guarantees on 
what is on disk for any data that hasn't been fsync'd. Not only can arbitrary 
bits of data be missing but it is even possible with some FS configurations to 
get arbitrary corrupt blocks that haven't been zero'd yet. I think to get this 
right you need a commit log and recovery proceedure that verifies unsync'd data 
on startup. I'm not 100% sure you can do this with just the buffer pool, though 
maybe you can.
2. What are the ordering guarantees for buffered data?
3. How does this interact with transactions/EOS?
4. Should it be the case that all writes go through the commit log or should it 
be the case that only failures are journaled. If you journal all writes prior 
to sending to the server, the problem is that that amounts to significant 
overhead and leads to the possibility that logging or other I/O can slow you 
down. If you journal only failures you have the problem that your throughput 
may be very high in the non-failure scenario and then when Kafka goes down 
suddenly you start doing I/O but that is much slower and your throughput drops 
precipitously. Either may be okay but it is worth thinking through what the 
right behavior is.

> Explore disk-based buffering in new Kafka Producer
> --
>
> Key: KAFKA-1955
> URL: https://issues.apache.org/jira/browse/KAFKA-1955
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 0.8.2.0
>    Reporter: Jay Kreps
>Assignee: Jay Kreps
> Attachments: KAFKA-1955.patch, 
> KAFKA-1955-RABASED-TO-8th-AUG-2015.patch
>
>
> There are two approaches to using Kafka for capturing event data that has no 
> other "source of truth store":
> 1. Just write to Kafka and try hard to keep the Kafka cluster up as you would 
> a database.
> 2. Write to some kind of local disk store and copy from that to Kafka.
> The cons of the second approach are the following:
> 1. You end up depending on disks on all the producer machines. If you have 
> 1 producers, that is 10k places state is kept. These tend to fail a lot.
> 2. You can get data arbitrarily delayed
> 3. You still don't tolerate hard outages since there is no replication in the 
> producer tier
> 4. This tends to make problems with duplicates more common in certain failure 
> scenarios.
> There is one big pro, though: you don't have to keep 

[jira] [Comment Edited] (KAFKA-1955) Explore disk-based buffering in new Kafka Producer

2017-06-08 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16043494#comment-16043494
 ] 

Jay Kreps edited comment on KAFKA-1955 at 6/8/17 9:41 PM:
--

I think the patch I submitted was kind of a cool hack, but after thinking about 
it I wasn't convinced it was really what you actually want.

Here are the considerations I thought we should probably think through:
1. How will recovery work? The patch I gave didn't have a mechanism to recover 
from a failure. In the case of a OS crash the OS gives very weak guarantees on 
what is on disk for any data that hasn't been fsync'd. Not only can arbitrary 
bits of data be missing but it is even possible with some FS configurations to 
get arbitrary corrupt blocks that haven't been zero'd yet. I think to get this 
right you need a commit log and recovery proceedure that verifies unsync'd data 
on startup. I'm not 100% sure you can do this with just the buffer pool, though 
maybe you can.
2. What are the ordering guarantees for buffered data?
3. How does this interact with transactions/EOS?
4. Should it be the case that all writes go through the commit log or should it 
be the case that only failures are journaled. If you journal all writes prior 
to sending to the server, the problem is that that amounts to significant 
overhead and leads to the possibility that logging or other I/O can slow you 
down. If you journal only failures you have the problem that your throughput 
may be very high in the non-failure scenario and then when Kafka goes down 
suddenly you start doing I/O but that is much slower and your throughput drops 
precipitously. Either may be okay but it is worth thinking through what the 
right behavior is.


was (Author: jkreps):
I think the patch I submitted was kind of a cool hack, but after thinking about 
it I don't think it is really what you want.

Here are the considerations I thought we should probably think through:
1. How will recovery work? The patch I gave didn't have a mechanism to recover 
from a failure. In the case of a OS crash the OS gives very weak guarantees on 
what is on disk for any data that hasn't been fsync'd. Not only can arbitrary 
bits of data be missing but it is even possible with some FS configurations to 
get arbitrary corrupt blocks that haven't been zero'd yet. I think to get this 
right you need a commit log and recovery proceedure that verifies unsync'd data 
on startup. I'm not 100% sure you can do this with just the buffer pool, though 
maybe you can.
2. What are the ordering guarantees for buffered data?
3. How does this interact with transactions/EOS?
4. Should it be the case that all writes go through the commit log or should it 
be the case that only failures are journaled. If you journal all writes prior 
to sending to the server, the problem is that that amounts to significant 
overhead and leads to the possibility that logging or other I/O can slow you 
down. If you journal only failures you have the problem that your throughput 
may be very high in the non-failure scenario and then when Kafka goes down 
suddenly you start doing I/O but that is much slower and your throughput drops 
precipitously. Either may be okay but it is worth thinking through what the 
right behavior is.

> Explore disk-based buffering in new Kafka Producer
> --
>
> Key: KAFKA-1955
> URL: https://issues.apache.org/jira/browse/KAFKA-1955
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 0.8.2.0
>    Reporter: Jay Kreps
>Assignee: Jay Kreps
> Attachments: KAFKA-1955.patch, 
> KAFKA-1955-RABASED-TO-8th-AUG-2015.patch
>
>
> There are two approaches to using Kafka for capturing event data that has no 
> other "source of truth store":
> 1. Just write to Kafka and try hard to keep the Kafka cluster up as you would 
> a database.
> 2. Write to some kind of local disk store and copy from that to Kafka.
> The cons of the second approach are the following:
> 1. You end up depending on disks on all the producer machines. If you have 
> 1 producers, that is 10k places state is kept. These tend to fail a lot.
> 2. You can get data arbitrarily delayed
> 3. You still don't tolerate hard outages since there is no replication in the 
> producer tier
> 4. This tends to make problems with duplicates more common in certain failure 
> scenarios.
> There is one big pro, though: you don't have to keep Kafka running all the 
> time.
> So far we have done nothing in Kafka to help support approach (2), but people 
> have built a lot of buffering things. It's not clear that this is necessarily 
> bad.
> However implementing this in the n

[jira] [Commented] (KAFKA-1955) Explore disk-based buffering in new Kafka Producer

2017-06-08 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1955?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16043494#comment-16043494
 ] 

Jay Kreps commented on KAFKA-1955:
--

I think the patch I submitted was kind of a cool hack, but after thinking about 
it I don't think it is really what you want.

Here are the considerations I thought we should probably think through:
1. How will recovery work? The patch I gave didn't have a mechanism to recover 
from a failure. In the case of a OS crash the OS gives very weak guarantees on 
what is on disk for any data that hasn't been fsync'd. Not only can arbitrary 
bits of data be missing but it is even possible with some FS configurations to 
get arbitrary corrupt blocks that haven't been zero'd yet. I think to get this 
right you need a commit log and recovery proceedure that verifies unsync'd data 
on startup. I'm not 100% sure you can do this with just the buffer pool, though 
maybe you can.
2. What are the ordering guarantees for buffered data?
3. How does this interact with transactions/EOS?
4. Should it be the case that all writes go through the commit log or should it 
be the case that only failures are journaled. If you journal all writes prior 
to sending to the server, the problem is that that amounts to significant 
overhead and leads to the possibility that logging or other I/O can slow you 
down. If you journal only failures you have the problem that your throughput 
may be very high in the non-failure scenario and then when Kafka goes down 
suddenly you start doing I/O but that is much slower and your throughput drops 
precipitously. Either may be okay but it is worth thinking through what the 
right behavior is.

> Explore disk-based buffering in new Kafka Producer
> --
>
> Key: KAFKA-1955
> URL: https://issues.apache.org/jira/browse/KAFKA-1955
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 0.8.2.0
>    Reporter: Jay Kreps
>Assignee: Jay Kreps
> Attachments: KAFKA-1955.patch, 
> KAFKA-1955-RABASED-TO-8th-AUG-2015.patch
>
>
> There are two approaches to using Kafka for capturing event data that has no 
> other "source of truth store":
> 1. Just write to Kafka and try hard to keep the Kafka cluster up as you would 
> a database.
> 2. Write to some kind of local disk store and copy from that to Kafka.
> The cons of the second approach are the following:
> 1. You end up depending on disks on all the producer machines. If you have 
> 1 producers, that is 10k places state is kept. These tend to fail a lot.
> 2. You can get data arbitrarily delayed
> 3. You still don't tolerate hard outages since there is no replication in the 
> producer tier
> 4. This tends to make problems with duplicates more common in certain failure 
> scenarios.
> There is one big pro, though: you don't have to keep Kafka running all the 
> time.
> So far we have done nothing in Kafka to help support approach (2), but people 
> have built a lot of buffering things. It's not clear that this is necessarily 
> bad.
> However implementing this in the new Kafka producer might actually be quite 
> easy. Here is an idea for how to do it. Implementation of this idea is 
> probably pretty easy but it would require some pretty thorough testing to see 
> if it was a success.
> The new producer maintains a pool of ByteBuffer instances which it attempts 
> to recycle and uses to buffer and send messages. When unsent data is queuing 
> waiting to be sent to the cluster it is hanging out in this pool.
> One approach to implementing a disk-baked buffer would be to slightly 
> generalize this so that the buffer pool has the option to use a mmap'd file 
> backend for it's ByteBuffers. When the BufferPool was created with a 
> totalMemory setting of 1GB it would preallocate a 1GB sparse file and memory 
> map it, then chop the file into batchSize MappedByteBuffer pieces and 
> populate it's buffer with those.
> Everything else would work normally except now all the buffered data would be 
> disk backed and in cases where there was significant backlog these would 
> start to fill up and page out.
> We currently allow messages larger than batchSize and to handle these we do a 
> one-off allocation of the necessary size. We would have to disallow this when 
> running in mmap mode. However since the disk buffer will be really big this 
> should not be a significant limitation as the batch size can be pretty big.
> We would want to ensure that the pooling always gives out the most recently 
> used ByteBuffer (I think it does). This way under normal operation where 
> requests are processed qui

Re: [DISCUSS] KIP-113: Support replicas movement between log directories

2017-06-07 Thread Jay Kreps
I think Ram's point is that in place failure is pretty complicated, and
this is meant to be a cost saving feature, we should construct an argument
for it grounded in data.

Assume an annual failure rate of 1% (reasonable, but data is available
online), and assume it takes 3 days to get the drive replaced. Say you have
10 drives per server. Then the expected downtime for each server is roughly
1% * 3 days * 10 = 0.3 days/year (this is slightly off since I'm ignoring
the case of multiple failures, but I don't know that changes it much). So
the savings from this feature is 0.3/365 = 0.08%. Say you have 1000 servers
and they cost $3000/year fully loaded including power, the cost of the hw
amortized over it's life, etc. Then this feature saves you $3000 on your
total server cost of $3m which seems not very worthwhile compared to other
optimizations...?

Anyhow, not sure the arithmetic is right there, but i think that is the
type of argument that would be helpful to think about the tradeoff in
complexity.

-Jay



On Tue, Jun 6, 2017 at 7:09 PM, Dong Lin  wrote:

> Hey Sriram,
>
> Thanks for taking time to review the KIP. Please see below my answers to
> your questions:
>
> >1. Could you pick a hardware/Kafka configuration and go over what is the
> >average disk/partition repair/restore time that we are targeting for a
> >typical JBOD setup?
>
> We currently don't have this data. I think the disk/partition repair/store
> time depends on availability of hardware, the response time of
> site-reliability engineer, the amount of data on the bad disk etc. These
> vary between companies and even clusters within the same company and it is
> probably hard to determine what is the average situation.
>
> I am not very sure why we need this. Can you explain a bit why this data is
> useful to evaluate the motivation and design of this KIP?
>
> >2. How often do we believe disks are going to fail (in your example
> >configuration) and what do we gain by avoiding the network overhead and
> >doing all the work of moving the replica within the broker to another disk
> >instead of balancing it globally?
>
> I think the chance of disk failure depends mainly on the disk itself rather
> than the broker configuration. I don't have this data now. I will ask our
> SRE whether they know the mean-time-to-fail for our disk. What I was told
> by SRE is that disk failure is the most common type of hardware failure.
>
> When there is disk failure, I think it is reasonable to move replica to
> another broker instead of another disk on the same broker. The reason we
> want to move replica within broker is mainly to optimize the Kafka cluster
> performance when we balance load across disks.
>
> In comparison to balancing replicas globally, the benefit of moving replica
> within broker is that:
>
> 1) the movement is faster since it doesn't go through socket or rely on the
> available network bandwidth;
> 2) much less impact on the replication traffic between broker by not taking
> up bandwidth between brokers. Depending on the pattern of traffic, we may
> need to balance load across disk frequently and it is necessary to prevent
> this operation from slowing down the existing operation (e.g. produce,
> consume, replication) in the Kafka cluster.
> 3) It gives us opportunity to do automatic broker rebalance between disks
> on the same broker.
>
>
> >3. Even if we had to move the replica within the broker, why cannot we
> just
> >treat it as another replica and have it go through the same replication
> >code path that we have today? The downside here is obviously that you need
> >to catchup from the leader but it is completely free! What do we think is
> >the impact of the network overhead in this case?
>
> Good point. My initial proposal actually used the existing
> ReplicaFetcherThread (i.e. the existing code path) to move replica between
> disks. However, I switched to use separate thread pool after discussion
> with Jun and Becket.
>
> The main argument for using separate thread pool is to actually keep the
> design simply and easy to reason about. There are a number of difference
> between inter-broker replication and intra-broker replication which makes
> it cleaner to do them in separate code path. I will list them below:
>
> - The throttling mechanism for inter-broker replication traffic and
> intra-broker replication traffic is different. For example, we may want to
> specify per-topic quota for inter-broker replication traffic because we may
> want some topic to be moved faster than other topic. But we don't care
> about priority of topics for intra-broker movement. So the current proposal
> only allows user to specify per-broker quota for inter-broker replication
> traffic.
>
> - The quota value for inter-broker replication traffic and intra-broker
> replication traffic is different. The available bandwidth for inter-broker
> replication can probably be much higher than the bandwidth for inter-broker
> 

Re: [DISCUSS]: KIP-161: streams record processing exception handlers

2017-05-29 Thread Jay Kreps
Hey Eno,

I think this makes sense. I do think people who spend time running
production stream processing systems will, over time, end up strongly
preferring the current behavior of failing and fixing the root problem
rather than skipping, but we don't need to force this on people as long as
the default is to fail.

One thing I'm confused about is the scope of the proposal. I think the plan
is that this would cover all exceptions that occur whether in serializers
or ANY user code? Is that right? So if I do stream.map(x =>
x.header.timestamp) and that throws a NullPointerException, this would be
triggered? If so what I understand is that what is passed in to me is the
original consumer record, not the value x that produced the null pointer
exception? Is that right? If this understanding is correct then the
name RecordExceptionHandler should maybe be something like
ProcessingExceptionHandler since the exception isn't necessarily directly
tied to an input Record, right?

A couple of other comments:

   - It's important we maintain the original stack trace when we rethrow
   the exception (probably obvious, but thought I'd mention it)
   - As a matter of style I'd advocate for making a single
   DefaultExceptionHandler which logs the error and adding configs for this to
   control when (if ever) it fails. This will allow adding additional useful
   options in a way that can be combined (such as the dead letter thing,
   retries, etc). Basically the point is that these facilities aren't
   "either/or". Also you mention adding configs for these in the existing
   proposal, it'd be good to say what the configs are.
   - I think we should hold off on retries unless we have worked out the
   full usage pattern, people can always implement their own. I think the idea
   is that you send the message to some kind of dead letter queue and then
   replay these later. This obviously destroys all semantic guarantees we are
   working hard to provide right now, which may be okay.
   - I agree that the LogAndThresholdExceptionHandler is closest to what
   most people think they want. I think making the exception handler stateful
   is probably fine since this is inherently an approximate threshold. I do
   think this is a bit more complex then it sounds though since you'll
   obviously need to compute some kind of cheap running rate. Obviously the
   two failure modes you'd need to avoid are that 1/1 failures = 100% OR
   conversely that it runs successfully for one year and then fails 100% of
   the time but that isn't caught because of the excess prior history.

-Jay


On Thu, May 25, 2017 at 2:47 AM, Eno Thereska 
wrote:

> Hi there,
>
> I’ve added a KIP on improving exception handling in streams:
> KIP-161: streams record processing exception handlers.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 161%3A+streams+record+processing+exception+handlers <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161:+streams+record+
> processing+exception+handlers>
>
> Discussion and feedback is welcome, thank you.
> Eno


Re: [VOTE] KIP-156 Add option "dry run" to Streams application reset tool

2017-05-09 Thread Jay Kreps
+1
On Tue, May 9, 2017 at 3:41 PM BigData dev  wrote:

> Hi, Everyone,
>
> Since this is a relatively simple change, I would like to start the voting
> process for KIP-156: Add option "dry run" to Streams application reset tool
>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=69410150
>
>
> The vote will run for a minimum of 72 hours.
>
>
> Thanks,
>
> Bharat
>


Re: [VOTE] KIP-144: Exponential backoff for broker reconnect attempts

2017-05-05 Thread Jay Kreps
+1
On Fri, May 5, 2017 at 7:29 PM Sriram Subramanian  wrote:

> +1
>
> On Fri, May 5, 2017 at 6:04 PM, Gwen Shapira  wrote:
>
> > +1
> >
> > On Fri, May 5, 2017 at 3:32 PM, Ismael Juma  wrote:
> >
> > > Hi all,
> > >
> > > Given the simple and non controversial nature of the KIP, I would like
> to
> > > start the voting process for KIP-144: Exponential backoff for broker
> > > reconnect attempts:
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 144%3A+Exponential+
> > > backoff+for+broker+reconnect+attempts
> > >
> > > The vote will run for a minimum of 72 hours.
> > >
> > > Thanks,
> > > Ismael
> > >
> >
> >
> >
> > --
> > *Gwen Shapira*
> > Product Manager | Confluent
> > 650.450.2760 | @gwenshap
> > Follow us: Twitter  | blog
> > 
> >
>


Re: [DISCUSS] KIP-150 - Kafka-Streams Cogroup

2017-05-05 Thread Jay Kreps
I haven't digested the proposal but the use case is pretty common. An
example would be the "customer 360" or "unified customer profile" use case
we often use. In that use case you have a dozen systems each of which has
some information about your customer (account details, settings, billing
info, customer service contacts, purchase history, etc). Your goal is to
join/munge these into a single profile record for each customer that has
all the relevant info in a usable form and is up-to-date with all the
source systems. If you implement that with kstreams as a sequence of joins
i think today we'd fully materialize N-1 intermediate tables. But clearly
you only need a single stage to group all these things that are already
co-partitioned. A distributed database would do this under the covers which
is arguably better (at least when it does the right thing) and perhaps we
could do the same thing but I'm not sure we know the partitioning so we may
need an explicit cogroup command that impllies they are already
co-partitioned.

-Jay

On Fri, May 5, 2017 at 5:56 AM, Kyle Winkelman 
wrote:

> Yea thats a good way to look at it.
> I have seen this type of functionality in a couple other platforms like
> spark and pig.
> https://spark.apache.org/docs/0.6.2/api/core/spark/PairRDDFunctions.html
> https://www.tutorialspoint.com/apache_pig/apache_pig_cogroup_operator.htm
>
>
> On May 5, 2017 7:43 AM, "Damian Guy"  wrote:
>
> > Hi Kyle,
> >
> > If i'm reading this correctly it is like an N way outer join? So an input
> > on any stream will always produce a new aggregated value - is that
> correct?
> > Effectively, each Aggregator just looks up the current value, aggregates
> > and forwards the result.
> > I need to look into it and think about it a bit more, but it seems like
> it
> > could be a useful optimization.
> >
> > On Thu, 4 May 2017 at 23:21 Kyle Winkelman 
> > wrote:
> >
> > > I sure can. I have added the following description to my KIP. If this
> > > doesn't help let me know and I will take some more time to build a
> > diagram
> > > and make more of a step by step description:
> > >
> > > Example with Current API:
> > >
> > > KTable table1 =
> > > builder.stream("topic1").groupByKey().aggregate(initializer1,
> > aggregator1,
> > > aggValueSerde1, storeName1);
> > > KTable table2 =
> > > builder.stream("topic2").groupByKey().aggregate(initializer2,
> > aggregator2,
> > > aggValueSerde2, storeName2);
> > > KTable table3 =
> > > builder.stream("topic3").groupByKey().aggregate(initializer3,
> > aggregator3,
> > > aggValueSerde3, storeName3);
> > > KTable cogrouped = table1.outerJoin(table2,
> > > joinerOneAndTwo).outerJoin(table3, joinerOneTwoAndThree);
> > >
> > > As you can see this creates 3 StateStores, requires 3 initializers,
> and 3
> > > aggValueSerdes. This also adds the pressure to user to define what the
> > > intermediate values are going to be (V1, V2, V3). They are left with a
> > > couple choices, first to make V1, V2, and V3 all the same as CG and the
> > two
> > > joiners are more like mergers, or second make them intermediate states
> > such
> > > as Topic1Map, Topic2Map, and Topic3Map and the joiners use those to
> build
> > > the final aggregate CG value. This is something the user could avoid
> > > thinking about with this KIP.
> > >
> > > When a new input arrives lets say at "topic1" it will first go through
> a
> > > KStreamAggregate grabbing the current aggregate from storeName1. It
> will
> > > produce this in the form of the first intermediate value and get sent
> > > through a KTableKTableOuterJoin where it will look up the current value
> > of
> > > the key in storeName2. It will use the first joiner to calculate the
> > second
> > > intermediate value, which will go through an additional
> > > KTableKTableOuterJoin. Here it will look up the current value of the
> key
> > in
> > > storeName3 and use the second joiner to build the final aggregate
> value.
> > >
> > > If you think through all possibilities for incoming topics you will see
> > > that no matter which topic it comes in through all three stores are
> > queried
> > > and all of the joiners must get used.
> > >
> > > Topology wise for N incoming streams this creates N
> > > KStreamAggregates, 2*(N-1) KTableKTableOuterJoins, and N-1
> > > KTableKTableJoinMergers.
> > >
> > >
> > >
> > > Example with Proposed API:
> > >
> > > KGroupedStream grouped1 = builder.stream("topic1").
> groupByKey();
> > > KGroupedStream grouped2 = builder.stream("topic2").
> groupByKey();
> > > KGroupedStream grouped3 = builder.stream("topic3").
> groupByKey();
> > > KTable cogrouped = grouped1.cogroup(initializer1, aggregator1,
> > > aggValueSerde1, storeName1)
> > > .cogroup(grouped2, aggregator2)
> > > .cogroup(grouped3, aggregator3)
> > > .aggregate();
> > >
> > > As you can see this creates 1 

Re: [DISCUSS] KIP 141 - ProducerRecordBuilder Interface

2017-05-01 Thread Jay Kreps
Hey Matthias,

Yeah, I think we mostly agree. I think if we think the new way is a lot
better, then I agree keeping it deprecated for a long while then removing
is a viable path. To me this case just seemed like a minor thing so the 1+
years of having a deprecated way in common use and a new way and the
resulting confusion just seems unnecessary, especially since this is so
much in the common path of the most basic kafka usage. My experience with
other apis like MapReduce or Google Collections that did this was pretty
negative.

Big time agreement that doing both is not the right thing to do. People
aren't looking for 4 different ways of creating a ProducerRecord, that just
generates confusion.

-Jay

On Mon, May 1, 2017 at 10:34 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Hi,
>
> I am personally not a big fan of providing two APIs to do the same
> thing. If we believe that one API is better than the other, we should
> indicate this by deprecating the old API IMHO.
>
> Just my two cents.
>
>
> -Matthias
>
>
> On 4/30/17 11:05 PM, Michael Pearce wrote:
> > See
> >
> > https://hc.apache.org/httpcomponents-client-ga/tutorial/html/fluent.html
> >
> > Doesn't cause much issue over there where you have a fluent api wrapper
> for those who like that style, and the original more verbose api.
> >
> > Surely it would be better than removing a way of doing things that
> everyone has got used to and built their code around ala constructors
> approach. And simply provide a wrapper to provide a per field way of doing
> things.
> >
> >
> >
> >
> >
> > Sent using OWA for iPhone
> > 
> > From: Stephane Maarek <steph...@simplemachines.com.au>
> > Sent: Monday, May 1, 2017 6:37:44 AM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP 141 - ProducerRecordBuilder Interface
> >
> > I’m not sure how people would feel about having two distinct methods to
> build the same object?
> > An API wrapper may be useful, but it doesn’t bring opinion about how one
> should program, that’s just driven by the docs.
> > I’m okay with that, but we need concensus
> >
> >
> > On 1/5/17, 6:08 am, "Michael Pearce" <michael.pea...@ig.com> wrote:
> >
> > Why not, instead of deprecating or removing whats there, as noted,
> its a point of preference, think about something that could wrap the
> existing, but provide an api that for you is cleaner?
> >
> > e.g. here's a sample idea building on a fluent api way. (this wraps
> the producer and producer records so no changes needed)
> >
> > https://gist.github.com/michaelandrepearce/
> de0f5ad4aa7d39d243781741c58c293e
> >
> > In future as new items further add to Producer Record, they just
> become new methods in the fluent API, as it builds the ProducerRecord using
> the most exhaustive constructor.
> >
> >
> >
> > 
> > From: Matthias J. Sax <matth...@confluent.io>
> > Sent: Saturday, April 29, 2017 6:52 PM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP 141 - ProducerRecordBuilder Interface
> >
> > I understand that we cannot just break stuff (btw: also not for
> > Streams!). But deprecating does not break anything, so I don't think
> > it's a big deal to change the API as long as we keep the old API as
> > deprecated.
> >
> >
> > -Matthias
> >
> > On 4/29/17 9:28 AM, Jay Kreps wrote:
> > > Hey Matthias,
> > >
> > > Yeah I agree, I'm not against change as a general thing! I also
> think if
> > > you look back on the last two years, we completely rewrote the
> producer and
> > > consumer APIs, reworked the binary protocol many times over, and
> added the
> > > connector and stream processing apis, both major new additions. So
> I don't
> > > think we're in too much danger of stagnating!
> > >
> > > My two cents was just around breaking compatibility for trivial
> changes
> > > like constructor => builder. I think this only applies to the
> producer,
> > > consumer, and connect apis which are heavily embedded in hundreds
> of
> > > ecosystem components that depend on them. This is different from
> direct
> > > usage. If we break the streams api it is really no big deal---apps
> just
> > > need to rebuild when they upgrade, not the end of the world at
> all. However
> > > because many intermediate things d

Re: [DISCUSS] KIP 141 - ProducerRecordBuilder Interface

2017-04-29 Thread Jay Kreps
Hey Matthias,

Yeah I agree, I'm not against change as a general thing! I also think if
you look back on the last two years, we completely rewrote the producer and
consumer APIs, reworked the binary protocol many times over, and added the
connector and stream processing apis, both major new additions. So I don't
think we're in too much danger of stagnating!

My two cents was just around breaking compatibility for trivial changes
like constructor => builder. I think this only applies to the producer,
consumer, and connect apis which are heavily embedded in hundreds of
ecosystem components that depend on them. This is different from direct
usage. If we break the streams api it is really no big deal---apps just
need to rebuild when they upgrade, not the end of the world at all. However
because many intermediate things depend on the Kafka producer you can cause
these weird situations where your app depends on two third party things
that use Kafka and each requires different, incompatible versions. We did
this a lot in earlier versions of Kafka and it was the cause of much angst
(and an ingrained general reluctance to upgrade) from our users.

I still think we may have to break things, i just don't think we should do
it for things like builders vs direct constructors which i think are kind
of a debatable matter of taste.

-Jay



On Mon, Apr 24, 2017 at 9:40 AM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Hey Jay,
>
> I understand your concern, and for sure, we will need to keep the
> current constructors deprecated for a long time (ie, many years).
>
> But if we don't make the move, we will not be able to improve. And I
> think warnings about using deprecated APIs is an acceptable price to
> pay. And the API improvements will help new people who adopt Kafka to
> get started more easily.
>
> Otherwise Kafka might end up as many other enterprise software with a
> lots of old stuff that is kept forever because nobody has the guts to
> improve/change it.
>
> Of course, we can still improve the docs of the deprecated constructors,
> too.
>
> Just my two cents.
>
>
> -Matthias
>
> On 4/23/17 3:37 PM, Jay Kreps wrote:
> > Hey guys,
> >
> > I definitely think that the constructors could have been better designed,
> > but I think given that they're in heavy use I don't think this proposal
> > will improve things. Deprecating constructors just leaves everyone with
> > lots of warnings and crossed out things. We can't actually delete the
> > methods because lots of code needs to be usable across multiple Kafka
> > versions, right? So we aren't picking between the original approach
> (worse)
> > and the new approach (better); what we are proposing is a perpetual
> > mingling of the original style and the new style with a bunch of
> deprecated
> > stuff, which I think is worst of all.
> >
> > I'd vote for just documenting the meaning of null in the ProducerRecord
> > constructor.
> >
> > -Jay
> >
> > On Wed, Apr 19, 2017 at 3:34 PM, Stephane Maarek <
> > steph...@simplemachines.com.au> wrote:
> >
> >> Hi all,
> >>
> >> My first KIP, let me know your thoughts!
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
> >> 141+-+ProducerRecordBuilder+Interface
> >>
> >>
> >> Cheers,
> >> Stephane
> >>
> >
>
>


Re: [ANNOUNCE] New committer: Rajini Sivaram

2017-04-24 Thread Jay Kreps
Congrats Rajini!

On Mon, Apr 24, 2017 at 2:06 PM, Gwen Shapira  wrote:

> The PMC for Apache Kafka has invited Rajini Sivaram as a committer and we
> are pleased to announce that she has accepted!
>
> Rajini contributed 83 patches, 8 KIPs (all security and quota
> improvements) and a significant number of reviews. She is also on the
> conference committee for Kafka Summit, where she helped select content
> for our community event. Through her contributions she's shown good
> judgement, good coding skills, willingness to work with the community on
> finding the best
> solutions and very consistent follow through on her work.
>
> Thank you for your contributions, Rajini! Looking forward to many more :)
>
> Gwen, for the Apache Kafka PMC
>


Re: [DISCUSS] KIP 141 - ProducerRecordBuilder Interface

2017-04-24 Thread Jay Kreps
Hey guys,

I definitely think that the constructors could have been better designed,
but I think given that they're in heavy use I don't think this proposal
will improve things. Deprecating constructors just leaves everyone with
lots of warnings and crossed out things. We can't actually delete the
methods because lots of code needs to be usable across multiple Kafka
versions, right? So we aren't picking between the original approach (worse)
and the new approach (better); what we are proposing is a perpetual
mingling of the original style and the new style with a bunch of deprecated
stuff, which I think is worst of all.

I'd vote for just documenting the meaning of null in the ProducerRecord
constructor.

-Jay

On Wed, Apr 19, 2017 at 3:34 PM, Stephane Maarek <
steph...@simplemachines.com.au> wrote:

> Hi all,
>
> My first KIP, let me know your thoughts!
> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
> 141+-+ProducerRecordBuilder+Interface
>
>
> Cheers,
> Stephane
>


Re: [VOTE] KIP-114: KTable state stores and improved semantics

2017-04-22 Thread Jay Kreps
+1 Very well thought out.

-Jay

On Fri, Apr 21, 2017 at 10:39 AM Eno Thereska 
wrote:

> Hi there,
>
> Unless there are more issues on the discuss thread, I'd like to start the
> vote on KIP-114.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-114%3A+KTable+state+stores+and+improved+semantics
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-114:+KTable+state+stores+and+improved+semantics
> >.
>
> Thanks
> Eno


Re: [DISCUSS] KIP-138: Change punctuate semantics

2017-04-04 Thread Jay Kreps
Hey guys,

One thing I've always found super important for this kind of design work is
to do a really good job of cataloging the landscape of use cases and how
prevalent each one is. By that I mean not just listing lots of uses, but
also grouping them into categories that functionally need the same thing.
In the absence of this it is very hard to reason about design proposals.
>From the proposals so far I think we have a lot of discussion around
possible apis, but less around what the user needs for different use cases
and how they would implement that using the api.

Here is an example:
You aggregate click and impression data for a reddit like site. Every ten
minutes you want to output a ranked list of the top 10 articles ranked by
clicks/impressions for each geographical area. I want to be able run this
in steady state as well as rerun to regenerate results (or catch up if it
crashes).

There are a couple of tricky things that seem to make this hard with either
of the options proposed:
1. If I emit this data using event time I have the problem described where
a geographical region with no new clicks or impressions will fail to output
results.
2. If I emit this data using system time I have the problem that when
reprocessing data my window may not be ten minutes but 10 hours if my
processing is very fast so it dramatically changes the output.

Maybe a hybrid solution works: I window by event time but trigger results
by system time for windows that have updated? Not really sure the details
of making that work. Does that work? Are there concrete examples where you
actually want the current behavior?

-Jay


On Tue, Apr 4, 2017 at 8:32 PM, Arun Mathew  wrote:

> Hi All,
>
> Thanks for the KIP. We were also in need of a mechanism to trigger
> punctuate in the absence of events.
>
> As I described in [
> https://issues.apache.org/jira/browse/KAFKA-3514?
> focusedCommentId=15926036=com.atlassian.jira.
> plugin.system.issuetabpanels:comment-tabpanel#comment-15926036
> ],
>
>- Our approached involved using the event time by default.
>- The method to check if there is any punctuate ready in the
>PunctuationQueue is triggered via the any event received by the stream
>tread, or at the polling intervals in the absence of any events.
>- When we create Punctuate objects (which contains the next event time
>for punctuation and interval), we also record the creation time (system
>time).
>- While checking for maturity of Punctuate Schedule by mayBePunctuate
>method, we also check if the system clock has elapsed the punctuate
>interval since the schedule creation time.
>- In the absence of any event, or in the absence of any event for one
>topic in the partition group assigned to the stream task, the system
> time
>will elapse the interval and we trigger a punctuate using the expected
>punctuation event time.
>- we then create the next punctuation schedule as punctuation event time
>+ punctuation interval, [again recording the system time of creation of
> the
>schedule].
>
> We call this a Hybrid Punctuate. Of course, this approach has pros and
> cons.
> Pros
>
>- Punctuates will happen in  time duration at max in
>terms of system time.
>- The semantics as a whole continues to revolve around event time.
>- We can use the old data [old timestamps] to rerun any experiments or
>tests.
>
> Cons
>
>- In case the   is not a time duration [say logical
>time/event count], then the approach might not be meaningful.
>- In case there is a case where we have to wait for an actual event from
>a low event rate partition in the partition group, this approach will
> jump
>the gun.
>- in case the event processing cannot catch up with the event rate and
>the expected timestamp events gets queued for long time, this approach
>might jump the gun.
>
> I believe the above approach and discussion goes close to the approach A.
>
> ---
>
> I like the idea of having an even count based punctuate.
>
> ---
>
> I agree with the discussion around approach C, that we should provide the
> user with the option to choose system time or event time based punctuates.
> But I believe that the user predominantly wants to use event time while not
> missing out on regular punctuates due to event delays or event absences.
> Hence a complex punctuate option as Matthias mentioned (quoted below) would
> be most apt.
>
> "- We might want to add "complex" schedules later on (like, punctuate on
> every 10 seconds event-time or 60 seconds system-time whatever comes
> first)."
>
> ---
>
> I think I read somewhere that Kafka Streams started with System Time as the
> punctuation standard, but was later changed to Event Time. I guess there
> would be some good reason behind it. As Kafka Streams want to evolve more
> on the Stream Processing front, I believe the emphasis on event time would
> remain quite 

Re: [VOTE] KIP-129: Kafka Streams Exactly-Once Semantics

2017-03-29 Thread Jay Kreps
+1

-Jay

On Mon, Mar 20, 2017 at 11:27 AM, Matthias J. Sax 
wrote:

> Hi,
>
> I would like to start the vote for KIP-129. Of course, feel free to
> provide some more feedback on the DISCUSS thread.
>
> Thanks a lot!
>
>
> -Matthias
>
>


Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-03-22 Thread Jay Kreps
;> -Matthias
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> On 3/14/17 3:36 AM, Michael Noll wrote:
> > > >>>>>> I see Jay's point, and I agree with much of it -- notably about
> > > being
> > > >>>>>> careful which concepts we do and do not expose, depending on
> which
> > > >> user
> > > >>>>>> group / user type is affected.  That said, I'm not sure yet
> > whether
> > > >> or
> > > >>>>> not
> > > >>>>>> we should get rid of "Topology" (or a similar term) in the DSL.
> > > >>>>>>
> > > >>>>>> For what it's worth, here's how related technologies define/name
> > > >> their
> > > >>>>>> "topologies" and "builders".  Note that, in all cases, it's
> about
> > > >>>>>> constructing a logical processing plan, which then is being
> > > >>> executed/run.
> > > >>>>>>
> > > >>>>>> - `Pipeline` (Google Dataflow/Apache Beam)
> > > >>>>>> - To add a source you first instantiate the Source (e.g.
> > > >>>>>> `TextIO.Read.from("gs://some/inputData.txt")`),
> > > >>>>>>   then attach it to your processing plan via
> > > >>>>> `Pipeline#apply()`.
> > > >>>>>>   This setup is a bit different to our DSL because in our
> DSL
> > > the
> > > >>>>>> builder does both, i.e.
> > > >>>>>>   instantiating + auto-attaching to itself.
> > > >>>>>> - To execute the processing plan you call
> > `Pipeline#execute()`.
> > > >>>>>> - `StreamingContext`` (Spark): This setup is similar to our DSL.
> > > >>>>>> - To add a source you call e.g.
> > > >>>>>> `StreamingContext#socketTextStream("localhost", )`.
> > > >>>>>> - To execute the processing plan you call
> > > >>>>> `StreamingContext#execute()`.
> > > >>>>>> - `StreamExecutionEnvironment` (Flink): This setup is similar to
> > our
> > > >>> DSL.
> > > >>>>>> - To add a source you call e.g.
> > > >>>>>> `StreamExecutionEnvironment#socketTextStream("localhost",
> )`.
> > > >>>>>> - To execute the processing plan you call
> > > >>>>>> `StreamExecutionEnvironment#execute()`.
> > > >>>>>> - `Graph`/`Flow` (Akka Streams), as a result of composing
> Sources
> > (~
> > > >>>>>> `KStreamBuilder.stream()`) and Sinks (~ `KStream#to()`)
> > > >>>>>>   into Flows, which are [Runnable]Graphs.
> > > >>>>>> - You instantiate a Source directly, and then compose the
> > Source
> > > >>> with
> > > >>>>>> Sinks to create a RunnableGraph:
> > > >>>>>>   see signature `Source#to[Mat2](sink: Graph[SinkShape[Out],
> > > >>> Mat2]):
> > > >>>>>> RunnableGraph[Mat]`.
> > > >>>>>> - To execute the processing plan you call `Flow#run()`.
> > > >>>>>>
> > > >>>>>> In our DSL, in comparison, we do:
> > > >>>>>>
> > > >>>>>> - `KStreamBuilder` (Kafka Streams API)
> > > >>>>>> - To add a source you call e.g.
> `KStreamBuilder#stream("input-
> > > >>>>> topic")`.
> > > >>>>>> - To execute the processing plan you create a `KafkaStreams`
> > > >>> instance
> > > >>>>>> from `KStreamBuilder`
> > > >>>>>>   (where the builder will instantiate the topology =
> > processing
> > > >>> plan
> > > >>>>> to
> > > >>>>>> be executed), and then
> > > >>>>>>   call `KafkaStreams#start()`.  Think of `KafkaStreams` as
> our
> > > >>>>> runner.
> > > >>>>>>

Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-03-13 Thread Jay Kreps
Hey Matthias,

Make sense, I'm more advocating for removing the word topology than any
particular new replacement.

-Jay

On Mon, Mar 13, 2017 at 12:30 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Jay,
>
> thanks for your feedback
>
> > What if instead we called it KStreamsBuilder?
>
> That's the current name and I personally think it's not the best one.
> The main reason why I don't like KStreamsBuilder is, that we have the
> concepts of KStreams and KTables, and the builder creates both. However,
> the name puts he focus on KStream and devalues KTable.
>
> I understand your argument, and I am personally open the remove the
> "Topology" part, and name it "StreamsBuilder". Not sure what others
> think about this.
>
>
> About Processor API: I like the idea in general, but I thinks it's out
> of scope for this KIP. KIP-120 has the focus on removing leaking
> internal APIs and do some cleanup how our API reflects some concepts.
>
> However, I added your idea to API discussion Wiki page and we take if
> from there:
> https://cwiki.apache.org/confluence/display/KAFKA/
> Kafka+Streams+Discussions
>
>
>
> -Matthias
>
>
> On 3/13/17 11:52 AM, Jay Kreps wrote:
> > Two things:
> >
> >1. This is a minor thing but the proposed new name for KStreamBuilder
> >is StreamsTopologyBuilder. I actually think we should not put
> topology in
> >the name as topology is not a concept you need to understand at the
> >kstreams layer right now. I'd think of three categories of concepts:
> (1)
> >concepts you need to understand to get going even for a simple
> example, (2)
> >concepts you need to understand to operate and debug a real
> production app,
> >(3) concepts we truly abstract and you don't need to ever understand.
> I
> >think in the kstream layer topologies are currently category (2), and
> this
> >is where they belong. By introducing the name in even the simplest
> example
> >it means the user has to go read about toplogies to really understand
> even
> >this simple snippet. What if instead we called it KStreamsBuilder?
> >2. For the processor api, I think this api is mostly not for end
> users.
> >However this are a couple cases where it might make sense to expose
> it. I
> >think users coming from Samza, or JMS's MessageListener (
> >https://docs.oracle.com/javaee/7/api/javax/jms/MessageListener.html)
> >understand a simple callback interface for message processing. In
> fact,
> >people often ask why Kafka's consumer doesn't provide such an
> interface.
> >I'd argue we do, it's KafkaStreams. The only issue is that the
> processor
> >API documentation is a bit scary for a person implementing this type
> of
> >api. My observation is that people using this style of API don't do a
> lot
> >of cross-message operations, then just do single message operations
> and use
> >a database for anything that spans messages. They also don't factor
> their
> >code into many MessageListeners and compose them, they just have one
> >listener that has the complete handling logic. Say I am a user who
> wants to
> >implement a single Processor in this style. Do we have an easy way to
> do
> >that today (either with the .transform/.process methods in kstreams
> or with
> >the topology apis)? Is there anything we can do in the way of trivial
> >helper code to make this better? Also, how can we explain that
> pattern to
> >people? I think currently we have pretty in-depth docs on our apis
> but I
> >suspect a person trying to figure out how to implement a simple
> callback
> >might get a bit lost trying to figure out how to wire it up. A simple
> five
> >line example in the docs would probably help a lot. Not sure if this
> is
> >best addressed in this KIP or is a side comment.
> >
> > Cheers,
> >
> > -Jay
> >
> > On Fri, Feb 3, 2017 at 3:33 PM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> >> Hi All,
> >>
> >> I did prepare a KIP to do some cleanup some of Kafka's Streaming API.
> >>
> >> Please have a look here:
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> 120%3A+Cleanup+Kafka+Streams+builder+API
> >>
> >> Looking forward to your feedback!
> >>
> >>
> >> -Matthias
> >>
> >>
> >
>
>


Re: [DISCUSS] KIP-120: Cleanup Kafka Streams builder API

2017-03-13 Thread Jay Kreps
Two things:

   1. This is a minor thing but the proposed new name for KStreamBuilder
   is StreamsTopologyBuilder. I actually think we should not put topology in
   the name as topology is not a concept you need to understand at the
   kstreams layer right now. I'd think of three categories of concepts: (1)
   concepts you need to understand to get going even for a simple example, (2)
   concepts you need to understand to operate and debug a real production app,
   (3) concepts we truly abstract and you don't need to ever understand. I
   think in the kstream layer topologies are currently category (2), and this
   is where they belong. By introducing the name in even the simplest example
   it means the user has to go read about toplogies to really understand even
   this simple snippet. What if instead we called it KStreamsBuilder?
   2. For the processor api, I think this api is mostly not for end users.
   However this are a couple cases where it might make sense to expose it. I
   think users coming from Samza, or JMS's MessageListener (
   https://docs.oracle.com/javaee/7/api/javax/jms/MessageListener.html)
   understand a simple callback interface for message processing. In fact,
   people often ask why Kafka's consumer doesn't provide such an interface.
   I'd argue we do, it's KafkaStreams. The only issue is that the processor
   API documentation is a bit scary for a person implementing this type of
   api. My observation is that people using this style of API don't do a lot
   of cross-message operations, then just do single message operations and use
   a database for anything that spans messages. They also don't factor their
   code into many MessageListeners and compose them, they just have one
   listener that has the complete handling logic. Say I am a user who wants to
   implement a single Processor in this style. Do we have an easy way to do
   that today (either with the .transform/.process methods in kstreams or with
   the topology apis)? Is there anything we can do in the way of trivial
   helper code to make this better? Also, how can we explain that pattern to
   people? I think currently we have pretty in-depth docs on our apis but I
   suspect a person trying to figure out how to implement a simple callback
   might get a bit lost trying to figure out how to wire it up. A simple five
   line example in the docs would probably help a lot. Not sure if this is
   best addressed in this KIP or is a side comment.

Cheers,

-Jay

On Fri, Feb 3, 2017 at 3:33 PM, Matthias J. Sax 
wrote:

> Hi All,
>
> I did prepare a KIP to do some cleanup some of Kafka's Streaming API.
>
> Please have a look here:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 120%3A+Cleanup+Kafka+Streams+builder+API
>
> Looking forward to your feedback!
>
>
> -Matthias
>
>


Re: [DISCUSS] KIP-129: Kafka Streams Exactly-Once Semantics

2017-03-02 Thread Jay Kreps
I second the concern on with the one producer per task approach. At a
high-level it seems to make sense but I think Damian is exactly right that
that cuts against the general design of the producer. Many people have high
input partition counts and will have high task counts as a result. I think
processing 1000 partitions should not be an unreasonable thing to want to
do.

The tricky bits will be:

   - Reduced effectiveness of batching (or more latency and memory to get
   equivalent batching). This doesn't show up in simple benchmarks because
   much of the penalty is I/O and CPU on the broker and the additional threads
   from all the producers can make a single-threaded benchmark seem faster.
   - TCP connection explosion. We maintain one connection per broker. This
   is already high since each app instance does this. This design though will
   add an additional multiplicative factor based on the partition count of the
   input.
   - Connection and metadata request storms. When an instance with 1000
   tasks starts up it is going to try to create many thousands of connections
   and issue a thousand metadata requests all at once.
   - Memory usage. We currently default to 64MB per producer. This can be
   tuned down, but the fact that we are spreading the batching over more
   producers will fundamentally mean we need a lot more memory to get good
   perf and the memory usage will change as your task assignment changes so it
   will be hard to set correctly unless it is done automatically.
   - Metrics explosion (1000 producer instances, each with their own
   metrics to monitor).
   - Thread explosion, 1000 background threads, one per producer, each
   sending data.

-Jay

On Wed, Mar 1, 2017 at 3:05 AM, Damian Guy  wrote:

> Hi Guozhang,
>
> Thanks for the KIP! This is an important feature for Kafka Streams and will
> help to unlock a bunch of use cases.
>
> I have some concerns/questions:
>
>1. Producer per task: I'm worried about the overhead this is going to
>put on both the streams app and the Kafka Brokers. You can easily
> imagine
>an app consuming thousands of partitions. What load will this put on the
>brokers? Am i correct in assuming that there will be metadata requests
> per
>Producer? The memory overhead in the streams app will also increase
> fairly
>significantly. Should we adjust ProducerConfig.BUFFER_MEMORY_CONFIG?
>2. State Store recovery: As we already know, restoring the entire
>changelog can take an extremely long time. Even with a fairly small
> dataset
>and an inappropriately tuned segment size, this can take way too long.
> My
>concern is that failures happen and then recovery takes "forever" and we
>end up in a situation where we need to change the max.poll.interval to
> be
>some very large number or else we end up in "rebalance hell". I don't
> think
>this provides a very good user experience. You mention RocksDB
>checkpointing in the doc - should we explore this idea some more? i.e.,
>understand the penalty for checkpointing. Maybe checkpoint every *n*
> commits?
>3. What does EoS mean for Caching? If we set the commit interval to
>100ms then the cache is not going to be very effective. Should it just
> be
>disabled?
>
> Thanks,
> Damian
>
> On Tue, 28 Feb 2017 at 21:54 Guozhang Wang  wrote:
>
> > Hi all,
> >
> > I have just created KIP-129 to leverage KIP-98 in Kafka Streams and
> provide
> > exactly-once processing semantics:
> >
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 129%3A+Streams+Exactly-Once+Semantics
> >
> > This KIP enables Streams users to optionally turn on exactly-once
> > processing semantics without changing their app code at all by leveraging
> > the transactional messaging features provided in KIP-98.
> >
> > The above wiki page provides a high-level view of the proposed changes,
> > while detailed implementation design can be found in this Google doc:
> >
> >
> > https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMadu
> FK1DAB8_gBYA2c
> >
> > We would love to hear your comments and suggestions.
> >
> > Thanks,
> > -- Guozhang
> >
>


Re: [DISCUSS] KIP-124: Request rate quotas

2017-02-24 Thread Jay Kreps
e it depends on the sequence of
> requests.
> > But we can look into more metrics after the KIP is implemented if
> required.
> >
> > I think we need to limit the maximum delay since all requests are
> > throttled. If a client has a quota of 0.001 units and a single request
> used
> > 50ms, we don't want to delay all requests from the client by 50 seconds,
> > throwing the client out of all its consumer groups. The issue is only if
> a
> > user is allocated a quota that is insufficient to process one large
> > request. The expectation is that the units allocated per user will be
> much
> > higher than the time taken to process one request and the limit should
> > seldom be applied. Agree this needs proper documentation.
> >
> > Regards,
> >
> > Rajini
> >
> >
> > On Thu, Feb 23, 2017 at 8:04 PM, radai <radai.rosenbl...@gmail.com>
> wrote:
> >
> >> @jun: i wasnt concerned about tying up a request processing thread, but
> >> IIUC the code does still read the entire request out, which might add-up
> >> to
> >> a non-negligible amount of memory.
> >>
> >> On Thu, Feb 23, 2017 at 11:55 AM, Dong Lin <lindon...@gmail.com> wrote:
> >>
> >> > Hey Rajini,
> >> >
> >> > The current KIP says that the maximum delay will be reduced to window
> >> size
> >> > if it is larger than the window size. I have a concern with this:
> >> >
> >> > 1) This essentially means that the user is allowed to exceed their
> quota
> >> > over a long period of time. Can you provide an upper bound on this
> >> > deviation?
> >> >
> >> > 2) What is the motivation for cap the maximum delay by the window
> size?
> >> I
> >> > am wondering if there is better alternative to address the problem.
> >> >
> >> > 3) It means that the existing metric-related config will have a more
> >> > directly impact on the mechanism of this io-thread-unit-based quota.
> The
> >> > may be an important change depending on the answer to 1) above. We
> >> probably
> >> > need to document this more explicitly.
> >> >
> >> > Dong
> >> >
> >> >
> >> > On Thu, Feb 23, 2017 at 10:56 AM, Dong Lin <lindon...@gmail.com>
> wrote:
> >> >
> >> > > Hey Jun,
> >> > >
> >> > > Yeah you are right. I thought it wasn't because at LinkedIn it will
> be
> >> > too
> >> > > much pressure on inGraph to expose those per-clientId metrics so we
> >> ended
> >> > > up printing them periodically to local log. Never mind if it is not
> a
> >> > > general problem.
> >> > >
> >> > > Hey Rajini,
> >> > >
> >> > > - I agree with Jay that we probably don't want to add a new field
> for
> >> > > every quota ProduceResponse or FetchResponse. Is there any use-case
> >> for
> >> > > having separate throttle-time fields for byte-rate-quota and
> >> > > io-thread-unit-quota? You probably need to document this as
> interface
> >> > > change if you plan to add new field in any request.
> >> > >
> >> > > - I don't think IOThread belongs to quotaType. The existing quota
> >> types
> >> > > (i.e. Produce/Fetch/LeaderReplication/FollowerReplication) identify
> >> the
> >> > > type of request that are throttled, not the quota mechanism that is
> >> > applied.
> >> > >
> >> > > - If a request is throttled due to this io-thread-unit-based quota,
> is
> >> > the
> >> > > existing queue-size metric in ClientQuotaManager incremented?
> >> > >
> >> > > - In the interest of providing guide line for admin to decide
> >> > > io-thread-unit-based quota and for user to understand its impact on
> >> their
> >> > > traffic, would it be useful to have a metric that shows the overall
> >> > > byte-rate per io-thread-unit? Can we also show this a per-clientId
> >> > metric?
> >> > >
> >> > > Thanks,
> >> > > Dong
> >> > >
> >> > >
> >> > > On Thu, Feb 23, 2017 at 9:25 AM, Jun Rao <j...@confluent.io> wrote:
> >> > >
> >> > >> Hi, Ismael,
> >> > >>
> >> > >> For #3, typicall

Re: [DISCUSS] KIP-126 - Allow KafkaProducer to batch based on uncompressed size

2017-02-23 Thread Jay Kreps
Hey Becket,

Yeah that makes sense.

I agree that you'd really have to both fix the estimation (i.e. make it per
topic or make it better estimate the high percentiles) AND have the
recovery mechanism. If you are underestimating often and then paying a high
recovery price that won't fly.

I think you take my main point though, which is just that I hate to exposes
these super low level options to users because it is so hard to explain to
people what it means and how they should set it. So if it is possible to
make either some combination of better estimation and splitting or better
tolerance of overage that would be preferrable.

-Jay

On Thu, Feb 23, 2017 at 11:51 AM, Becket Qin <becket@gmail.com> wrote:

> @Dong,
>
> Thanks for the comments. The default behavior of the producer won't change.
> If the users want to use the uncompressed message size, they probably will
> also bump up the batch size to somewhere close to the max message size.
> This would be in the document. BTW the default batch size is 16K which is
> pretty small.
>
> @Jay,
>
> Yeah, we actually had debated quite a bit internally what is the best
> solution to this.
>
> I completely agree it is a bug. In practice we usually leave some headroom
> to allow the compressed size to grow a little if the the original messages
> are not compressible, for example, 1000 KB instead of exactly 1 MB. It is
> likely safe enough.
>
> The major concern for the rejected alternative is performance. It largely
> depends on how frequent we need to split a batch, i.e. how likely the
> estimation can go off. If we only need to the split work occasionally, the
> cost would be amortized so we don't need to worry about it too much.
> However, it looks that for a producer with shared topics, the estimation is
> always off. As an example, consider two topics, one with compression ratio
> 0.6 the other 0.2, assuming exactly same traffic, the average compression
> ratio would be roughly 0.4, which is not right for either of the topics. So
> almost half of the batches (of the topics with 0.6 compression ratio) will
> end up larger than the configured batch size. When it comes to more topics
> such as mirror maker, this becomes more unpredictable. To avoid frequent
> rejection / split of the batches, we need to configured the batch size
> pretty conservatively. This could actually hurt the performance because we
> are shoehorn the messages that are highly compressible to a small batch so
> that the other topics that are not that compressible will not become too
> large with the same batch size. At LinkedIn, our batch size is configured
> to 64 KB because of this. I think we may actually have better batching if
> we just use the uncompressed message size and 800 KB batch size.
>
> We did not think about loosening the message size restriction, but that
> sounds a viable solution given that the consumer now can fetch oversized
> messages. One concern would be that on the broker side oversized messages
> will bring more memory pressure. With KIP-92, we may mitigate that, but the
> memory allocation for large messages may not be very GC friendly. I need to
> think about this a little more.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Wed, Feb 22, 2017 at 8:57 PM, Jay Kreps <j...@confluent.io> wrote:
>
> > Hey Becket,
> >
> > I get the problem we want to solve with this, but I don't think this is
> > something that makes sense as a user controlled knob that everyone
> sending
> > data to kafka has to think about. It is basically a bug, right?
> >
> > First, as a technical question is it true that using the uncompressed
> size
> > for batching actually guarantees that you observe the limit? I think that
> > implies that compression always makes the messages smaller, which i think
> > usually true but is not guaranteed, right? e.g. if someone encrypts their
> > data which tends to randomize it and then enables compressesion, it could
> > slightly get bigger?
> >
> > I also wonder if the rejected alternatives you describe couldn't be made
> to
> > work: basically try to be a bit better at estimation and recover when we
> > guess wrong. I don't think the memory usage should be a problem: isn't it
> > the same memory usage the consumer of that topic would need? And can't
> you
> > do the splitting and recompression in a streaming fashion? If we an make
> > the estimation rate low and the recovery cost is just ~2x the normal cost
> > for that batch that should be totally fine, right? (It's technically true
> > you might have to split more than once, but since you halve it each time
> I
> > think should you get a number of halvings that is logarithmic in the miss

Re: [DISCUSS] KIP-124: Request rate quotas

2017-02-23 Thread Jay Kreps
 > > > > > > > > for
> > > > > > > > > > generating metrics that we could use, though we might
> want
> > to
> > > > > > switch
> > > > > > > to
> > > > > > > > > > nanoTime() instead of currentTimeMillis() since some of
> the
> > > > > values
> > > > > > > for
> > > > > > > > > > small requests may be < 1ms. But rather than add up the
> > time
> > > > > spent
> > > > > > in
> > > > > > > > I/O
> > > > > > > > > > thread and network thread, wouldn't it be better to
> convert
> > > the
> > > > > > time
> > > > > > > > > spent
> > > > > > > > > > on each thread into a separate ratio? UserA has a request
> > > quota
> > > > > of
> > > > > > > 5%.
> > > > > > > > > Can
> > > > > > > > > > we take that to mean that UserA can use 5% of the time on
> > > > network
> > > > > > > > threads
> > > > > > > > > > and 5% of the time on I/O threads? If either is exceeded,
> > the
> > > > > > > response
> > > > > > > > is
> > > > > > > > > > throttled - it would mean maintaining two sets of metrics
> > for
> > > > the
> > > > > > two
> > > > > > > > > > durations, but would result in more meaningful ratios. We
> > > could
> > > > > > > define
> > > > > > > > > two
> > > > > > > > > > quota limits (UserA has 5% of request threads and 10% of
> > > > network
> > > > > > > > > threads),
> > > > > > > > > > but that seems unnecessary and harder to explain to
> users.
> > > > > > > > > >
> > > > > > > > > > Back to why and how quotas are applied to network thread
> > > > > > utilization:
> > > > > > > > > > a) In the case of fetch,  the time spent in the network
> > > thread
> > > > > may
> > > > > > be
> > > > > > > > > > significant and I can see the need to include this. Are
> > there
> > > > > other
> > > > > > > > > > requests where the network thread utilization is
> > significant?
> > > > In
> > > > > > the
> > > > > > > > case
> > > > > > > > > > of fetch, request handler thread utilization would
> throttle
> > > > > clients
> > > > > > > > with
> > > > > > > > > > high request rate, low data volume and fetch byte rate
> > quota
> > > > will
> > > > > > > > > throttle
> > > > > > > > > > clients with high data volume. Network thread utilization
> > is
> > > > > > perhaps
> > > > > > > > > > proportional to the data volume. I am wondering if we
> even
> > > need
> > > > > to
> > > > > > > > > throttle
> > > > > > > > > > based on network thread utilization or whether the data
> > > volume
> > > > > > quota
> > > > > > > > > covers
> > > > > > > > > > this case.
> > > > > > > > > >
> > > > > > > > > > b) At the moment, we record and check for quota violation
> > at
> > > > the
> > > > > > same
> > > > > > > > > time.
> > > > > > > > > > If a quota is violated, the response is delayed. Using
> > Jay'e
> > > > > > example
> > > > > > > of
> > > > > > > > > > disk reads for fetches happening in the network thread,
> We
> > > > can't
> > > > > > > record
> > > > > > > > > and
> > > > > > > > > > delay a response after the disk reads. We could record
> the
> > > time
> > > > > > spent
> > > > > > > > on
> > > > > > > >

Re: [DISCUSS] KIP-126 - Allow KafkaProducer to batch based on uncompressed size

2017-02-22 Thread Jay Kreps
Hey Becket,

I get the problem we want to solve with this, but I don't think this is
something that makes sense as a user controlled knob that everyone sending
data to kafka has to think about. It is basically a bug, right?

First, as a technical question is it true that using the uncompressed size
for batching actually guarantees that you observe the limit? I think that
implies that compression always makes the messages smaller, which i think
usually true but is not guaranteed, right? e.g. if someone encrypts their
data which tends to randomize it and then enables compressesion, it could
slightly get bigger?

I also wonder if the rejected alternatives you describe couldn't be made to
work: basically try to be a bit better at estimation and recover when we
guess wrong. I don't think the memory usage should be a problem: isn't it
the same memory usage the consumer of that topic would need? And can't you
do the splitting and recompression in a streaming fashion? If we an make
the estimation rate low and the recovery cost is just ~2x the normal cost
for that batch that should be totally fine, right? (It's technically true
you might have to split more than once, but since you halve it each time I
think should you get a number of halvings that is logarithmic in the miss
size, which, with better estimation you'd hope would be super duper small).

Alternatively maybe we could work on the other side of the problem and try
to make it so that a small miss on message size isn't a big problem. I
think original issue was that max size and fetch size were tightly coupled
and the way memory in the consumer worked you really wanted fetch size to
be as small as possible because you'd use that much memory per fetched
partition and the consumer would get stuck if its fetch size wasn't big
enough. I think we made some progress on that issue and maybe more could be
done there so that a small bit of fuzziness around the size would not be an
issue?

-Jay



On Tue, Feb 21, 2017 at 12:30 PM, Becket Qin  wrote:

> Hi folks,
>
> I would like to start the discussion thread on KIP-126. The KIP propose
> adding a new configuration to KafkaProducer to allow batching based on
> uncompressed message size.
>
> Comments are welcome.
>
> The KIP wiki is following:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 126+-+Allow+KafkaProducer+to+batch+based+on+uncompressed+size
>
> Thanks,
>
> Jiangjie (Becket) Qin
>


Re: [DISCUSS] KIP-124: Request rate quotas

2017-02-20 Thread Jay Kreps
Hey Becket/Rajini,

When I thought about it more deeply I came around to the "percent of
processing time" metric too. It seems a lot closer to the thing we actually
care about and need to protect. I also think this would be a very useful
metric even in the absence of throttling just to debug whose using capacity.

Two problems to consider:

   1. I agree that for the user it is understandable what lead to their
   being throttled, but it is a bit hard to figure out the safe range for
   them. i.e. if I have a new app that will send 200 messages/sec I can
   probably reason that I'll be under the throttling limit of 300 req/sec.
   However if I need to be under a 10% CPU resources limit it may be a bit
   harder for me to know a priori if i will or won't.
   2. Calculating the available CPU time is a bit difficult since there are
   actually two thread pools--the I/O threads and the network threads. I think
   it might be workable to count just the I/O thread time as in the proposal,
   but the network thread work is actually non-trivial (e.g. all the disk
   reads for fetches happen in that thread). If you count both the network and
   I/O threads it can skew things a bit. E.g. say you have 50 network threads,
   10 I/O threads, and 8 cores, what is the available cpu time available in a
   second? I suppose this is a problem whenever you have a bottleneck between
   I/O and network threads or if you end up significantly over-provisioning
   one pool (both of which are hard to avoid).

An alternative for CPU throttling would be to use this api:
http://docs.oracle.com/javase/1.5.0/docs/api/java/lang/management/ThreadMXBean.html#getThreadCpuTime(long)

That would let you track actual CPU usage across the network, I/O threads,
and purgatory threads and look at it as a percentage of total cores. I
think this fixes many problems in the reliability of the metric. It's
meaning is slightly different as it is just CPU (you don't get charged for
time blocking on I/O) but that may be okay because we already have a
throttle on I/O. The downside is I think it is possible this api can be
disabled or isn't always available and it may also be expensive (also I've
never used it so not sure if it really works the way i think).

-Jay

On Mon, Feb 20, 2017 at 3:17 PM, Becket Qin <becket@gmail.com> wrote:

> If the purpose of the KIP is only to protect the cluster from being
> overwhelmed by crazy clients and is not intended to address resource
> allocation problem among the clients, I am wondering if using request
> handling time quota (CPU time quota) is a better option. Here are the
> reasons:
>
> 1. request handling time quota has better protection. Say we have request
> rate quota and set that to some value like 100 requests/sec, it is possible
> that some of the requests are very expensive actually take a lot of time to
> handle. In that case a few clients may still occupy a lot of CPU time even
> the request rate is low. Arguably we can carefully set request rate quota
> for each request and client id combination, but it could still be tricky to
> get it right for everyone.
>
> If we use the request time handling quota, we can simply say no clients can
> take up to more than 30% of the total request handling capacity (measured
> by time), regardless of the difference among different requests or what is
> the client doing. In this case maybe we can quota all the requests if we
> want to.
>
> 2. The main benefit of using request rate limit is that it seems more
> intuitive. It is true that it is probably easier to explain to the user
> what does that mean. However, in practice it looks the impact of request
> rate quota is not more quantifiable than the request handling time quota.
> Unlike the byte rate quota, it is still difficult to give a number about
> impact of throughput or latency when a request rate quota is hit. So it is
> not better than the request handling time quota. In fact I feel it is
> clearer to tell user that "you are limited because you have taken 30% of
> the CPU time on the broker" than otherwise something like "your request
> rate quota on metadata request has reached".
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Mon, Feb 20, 2017 at 2:23 PM, Jay Kreps <j...@confluent.io> wrote:
>
> > I think this proposal makes a lot of sense (especially now that it is
> > oriented around request rate) and fills the biggest remaining gap in the
> > multi-tenancy story.
> >
> > I think for intra-cluster communication (StopReplica, etc) we could avoid
> > throttling entirely. You can secure or otherwise lock-down the cluster
> > communication to avoid any unauthorized external party from trying to
> > initiate these requests. As a result we are as likely to cause problems
> as
> > solve them by t

Re: [DISCUSS] KIP-124: Request rate quotas

2017-02-20 Thread Jay Kreps
I think this proposal makes a lot of sense (especially now that it is
oriented around request rate) and fills the biggest remaining gap in the
multi-tenancy story.

I think for intra-cluster communication (StopReplica, etc) we could avoid
throttling entirely. You can secure or otherwise lock-down the cluster
communication to avoid any unauthorized external party from trying to
initiate these requests. As a result we are as likely to cause problems as
solve them by throttling these, right?

I'm not so sure that we should exempt the consumer requests such as
heartbeat. It's true that if we throttle an app's heartbeat requests it may
cause it to fall out of its consumer group. However if we don't throttle it
it may DDOS the cluster if the heartbeat interval is set incorrectly or if
some client in some language has a bug. I think the policy with this kind
of throttling is to protect the cluster above any individual app, right? I
think in general this should be okay since for most deployments this
setting is meant as more of a safety valve---that is rather than set
something very close to what you expect to need (say 2 req/sec or whatever)
you would have something quite high (like 100 req/sec) with this meant to
prevent a client gone crazy. I think when used this way allowing those to
be throttled would actually provide meaningful protection.

-Jay



On Fri, Feb 17, 2017 at 9:05 AM, Rajini Sivaram 
wrote:

> Hi all,
>
> I have just created KIP-124 to introduce request rate quotas to Kafka:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 124+-+Request+rate+quotas
>
> The proposal is for a simple percentage request handling time quota that
> can be allocated to **, ** or **. There
> are a few other suggestions also under "Rejected alternatives". Feedback
> and suggestions are welcome.
>
> Thank you...
>
> Regards,
>
> Rajini
>


Re: [VOTE] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-02-14 Thread Jay Kreps
+1

Super happy with how this turned out. It's been a long journey since we
started thinking about this 3+ years ago. Can't wait to see it in
code---this is a big one! :-)

-Jay

On Wed, Feb 1, 2017 at 8:13 PM, Guozhang Wang  wrote:

> Hi all,
>
> We would like to start the voting process for KIP-98. The KIP can be found
> at
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
>
> Discussion thread can be found here:
>
> http://search-hadoop.com/m/Kafka/uyzND1jwZrr7HRHf?subj=+
> DISCUSS+KIP+98+Exactly+Once+Delivery+and+Transactional+Messaging
>
> Thanks,
>
> --
> -- Guozhang
>


Re: KIP-121 [VOTE]: Add KStream peek method

2017-02-14 Thread Jay Kreps
+1

Nice improvement.

-Jay

On Tue, Feb 14, 2017 at 1:22 PM, Steven Schlansker <
sschlans...@opentable.com> wrote:

> Hi, it looks like I have 2 of the 3 minimum votes, can a third voter
> please consider this KIP?
> Thanks.
>
> (PS - new revision on GitHub PR with hopefully the last round of
> improvements)
>
> > On Feb 8, 2017, at 9:06 PM, Matthias J. Sax 
> wrote:
> >
> > +1
> >
> > On 2/8/17 4:51 PM, Gwen Shapira wrote:
> >> +1 (binding)
> >>
> >> On Wed, Feb 8, 2017 at 4:45 PM, Steven Schlansker
> >>  wrote:
> >>> Hi everyone,
> >>>
> >>> Thank you for constructive feedback on KIP-121,
> KStream.peek(ForeachAction) ;
> >>> it seems like it is time to call a vote which I hope will pass easily
> :)
> >>>
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 121%3A+Add+KStream+peek+method
> >>>
> >>> I believe the PR attached is already in good shape to consider merging:
> >>>
> >>> https://github.com/apache/kafka/pull/2493
> >>>
> >>> Thanks!
> >>> Steven
> >>>
> >>
> >>
> >>
> >
>
>


Re: [VOTE] KIP-82 Add Record Headers

2017-02-14 Thread Jay Kreps
Couple of things I think we still need to work out:

   1. I think we agree about the key, but I think we haven't talked about
   the value yet. I think if our goal is an open ecosystem of these header
   spread across many plugins from many systems we should consider making this
   a string as well so it can be printed, set via a UI, set in config, etc.
   Basically encouraging pluggable serialization formats here will lead to a
   bit of a tower of babel.
   2. This proposal still includes a pretty big change to our serialization
   and protocol definition layer. Essentially it is introducing an optional
   type, where the format is data dependent. I think this is actually a big
   change though it doesn't seem like it. It means you can no longer specify
   this type with our type definition DSL, and likewise it requires custom
   handling in client libs. This isn't a huge thing, since the Record
   definition is custom anyway, but I think this kind of protocol
   inconsistency is very non-desirable and ties you to hand-coding things. I
   think the type should instead by [Key Value] in our BNF, where key and
   value are both short strings as used elsewhere. This brings it in line with
   the rest of the protocol.
   3. Could we get more specific about the exact Java API change to
   ProducerRecord, ConsumerRecord, Record, etc?

-Jay

On Tue, Feb 14, 2017 at 9:42 AM, Michael Pearce 
wrote:

> Hi all,
>
> We would like to start the voting process for KIP-82 – Add record headers.
> The KIP can be found
> at
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 82+-+Add+Record+Headers
>
> Discussion thread(s) can be found here:
>
> http://search-hadoop.com/m/Kafka/uyzND1nSTOHTvj81?subj=
> Re+DISCUSS+KIP+82+Add+Record+Headers
> http://search-hadoop.com/m/Kafka/uyzND1Arxt22Tvj81?subj=
> Re+DISCUSS+KIP+82+Add+Record+Headers
> http://search-hadoop.com/?project=Kafka=KIP-82
>
>
>
> Thanks,
> Mike
>
> The information contained in this email is strictly confidential and for
> the use of the addressee only, unless otherwise indicated. If you are not
> the intended recipient, please do not read, copy, use or disclose to others
> this message or any attachment. Please also notify the sender by replying
> to this email or by telephone (+44(020 7896 0011) and then delete the email
> and any copies of it. Opinions, conclusion (etc) that do not relate to the
> official business of this company shall be understood as neither given nor
> endorsed by it. IG is a trading name of IG Markets Limited (a company
> registered in England and Wales, company number 04008957) and IG Index
> Limited (a company registered in England and Wales, company number
> 01190902). Registered address at Cannon Bridge House, 25 Dowgate Hill,
> London EC4R 2YA. Both IG Markets Limited (register number 195355) and IG
> Index Limited (register number 114059) are authorised and regulated by the
> Financial Conduct Authority.
>


Re: [DISCUSS] KIP-117: Add a public AdministrativeClient API for Kafka admin operations

2017-02-12 Thread Jay Kreps
, no messing with
> batch APIs.  If there is an error, then Future#get() throws an
> ExecutionException which wraps the relevant exception in the standard
> Java way.
>
> Here's a slightly less simple example:
>
> > AdminClient client = new AdminClientImpl(myConfig);
> > try {
> >   List<Future> futures = new LinkedList<>();
> >   for (String topicName: myNewTopicNames) {
> > creations.add(client.topics().
> > setClientTimeout(3).setCreationConfig(myTopicConfig).
> >   create(topicName, 3, (short) 2, false));
> >   }
> >   Futures.waitForAll(futures);
> > } finally {
> >   client.close();
> > }
>
> I went with Futures because I feel like ought to have some option for
> doing async.  It's a style of programming that has become a lot more
> popular with the rise of Node.js, Twisted python, etc. etc.  Also, as
> Ismael commented, Java 8 CompletableFuture is going to make Java's
> support for fluent async programming a lot stronger by allowing call
> chaining and much more.
>
> If we are going to support async, the simplest thing is just to make
> everything return a future and let people call get() if they want to run
> synchronously.  Having a mix of async and sync APIs is just going to be
> confusing and redundant.
>
> I think we should try to avoid creating single functions that start
> multiple requests if we can.  It makes things much uglier.  It means
> that you have to have some kind of request class that wraps up the
> request the user is trying to create, so that you can handle an array of
> those requests.  The return value has to be something like Map<Node,
> Try> to represent which nodes failed and succeeded.  This is the
> kind of stuff that, in my opinion, makes people scratch their heads.
>
> If we need to, we can still get some of the efficiency benefits of batch
> APIs by waiting for a millisecond or two before sending out a topic
> create() request to see if other create() requests arrive.  If so, we
> can coalesce them.  It might be better to figure out if this is an
> actual performance issue before implementing it, though.
>
> I think it would be good to get something out there, annotate it as
> @Unstable, and get feedback from people building against trunk and using
> it.  We have removed or changed @Unstable APIs in streams before, so I
> don't think we should worry that it will get set in stone prematurely.
> The AdminClient API should get much less developer use than anything in
> streams, so changing an unstable API should be much easier.
>
> best,
> Colin
>
>
> On Wed, Feb 8, 2017, at 07:49, Ismael Juma wrote:
> > Thanks for elaborating Jay. I totally agree that we have to be very
> > careful
> > in how we use our complexity budget. Easier said than done when people
> > don't agree on what is complex and what is simple. :) For example, I
> > think
> > batch APIs are a significant source of complexity as you have to do a
> > bunch
> > of ceremony to group things before sending the request and error handling
> > becomes more complex due to partial failures (things like `Try` or other
> > mechanisms that serve a similar role are then needed).
> >
> > Maybe a way forward is to write API usage examples to help validate that
> > the suggested API is indeed easy to use.
> >
> > Ismael
> >
> > On Wed, Feb 8, 2017 at 4:40 AM, Jay Kreps <j...@confluent.io> wrote:
> >
> > > Totally agree on CompletableFuture. Also agree with some of the rough
> edges
> > > on the Consumer.
> > >
> > > I don't have much of a leg to stand on with the splitting vs not
> splitting
> > > thing, really hard to argue one or the other is better. I guess the one
> > > observation in watching us try to make good public apis over the years
> is I
> > > am kind of in favor of a particular kind of simple. In particular I
> think
> > > since the bar is sooo high in support and docs and the community of
> users
> > > so broad in the range of their capabilities, it makes it so there is a
> lot
> > > of value in dead simple interfaces that don't have a lot of conceptual
> > > weight, don't introduce a lot of new classes or concepts or general
> > > patterns that must be understood to use them correctly. So things like
> > > nesting, or the Try class, or async apis, or even just a complex set of
> > > classes representing arguments or return values kind of all stack
> > > conceptual burdens on the user to figure out correct usage. So like,
> for
> > > example, the Try class is very elegant and represents a w

Re: [DISCUSS] KIP-117: Add a public AdministrativeClient API for Kafka admin operations

2017-02-07 Thread Jay Kreps
Totally agree on CompletableFuture. Also agree with some of the rough edges
on the Consumer.

I don't have much of a leg to stand on with the splitting vs not splitting
thing, really hard to argue one or the other is better. I guess the one
observation in watching us try to make good public apis over the years is I
am kind of in favor of a particular kind of simple. In particular I think
since the bar is sooo high in support and docs and the community of users
so broad in the range of their capabilities, it makes it so there is a lot
of value in dead simple interfaces that don't have a lot of conceptual
weight, don't introduce a lot of new classes or concepts or general
patterns that must be understood to use them correctly. So things like
nesting, or the Try class, or async apis, or even just a complex set of
classes representing arguments or return values kind of all stack
conceptual burdens on the user to figure out correct usage. So like, for
example, the Try class is very elegant and represents a whole generalized
class of possibly completed actions, but the flip side is maybe I'm just a
working guy who needs to list his kafka topics but doesn't know Rust, take
pity on me! :-)

Nit picking aside, super excited to see us progress on this.

-Jay

On Tue, Feb 7, 2017 at 3:46 PM Ismael Juma <ism...@juma.me.uk> wrote:

> Hi Jay,
>
> Thanks for the feedback. Comments inline.
>
> On Tue, Feb 7, 2017 at 8:18 PM, Jay Kreps <j...@confluent.io> wrote:
> >
> >- I think it would be good to not use "get" as the prefix for things
> >making remote calls. We've tried to avoid the java getter convention
> >entirely (see code style guide), but for remote calls in particular it
> > kind
> >of blurs the line between field access and remote RPC in a way that
> > leads
> >people to trouble. What about, e.g., fetchAllGroups() vs
> getAllGroups().
> >
>
> Agreed that we should avoid the `get` prefix for remote calls. There are a
> few possible prefixes for the read operations: list, fetch, describe.
>
>
> >- I think futures and callbacks are a bit of a pain to use. I'd second
> >Becket's comment: let's ensure there a common use case motivating
> these
> >that wouldn't be just as easily satisfied with batch operations (which
> > we
> >seem to have at least for some things). In terms of flexibility
> > Callbacks >
> >Futures > Batch Ops but I think in terms of usability it is the exact
> >opposite so let's make sure we have worked out how the API will be
> used
> >before deciding. In particular I think java Futures are often an
> >uncomfortable half-way point since calling get() and blocking the
> > thread is
> >often not what you want for chaining sequences of operations in a
> truly
> >async way, so 99% of people just use the future as a way to batch
> calls.
> >
>
> We should definitely figure out how the APIs are going to be used before
> deciding. I agree that callbacks are definitely painful and there's little
> reason to expose them in a modern API unless it's meant to be very low
> level. When it comes to Futures, I think it's important to distinguish what
> is available in Java 7 and below versus what is available from Java 8
> onwards. CompletableFuture makes it much easier to compose/chain operations
> (in a similar vein to java.util.Stream, our own Streams API, etc.) and it
> gives you the ability to register callbacks if you really want to (avoiding
> the somewhat odd situation in the Producer where we return a Future _and_
> allow you to pass a callback).
>
>
> >- Personally I don't think splitting the admin methods up actually
> makes
> >things more usable. It just makes you have to dig through our
> > hierarchy. I
> >think a flat class with a bunch of operations (like the consumer api)
> is
> >probably the easiest for people to grok and find things on. I am kind
> > of a
> >dumb PHP programmer at heart, though.
> >
>
> I am not sure it's fair to compare the AdminClient with the Consumer. The
> former has APIs for a bunch of unrelated APIs (topics, ACLs, configs,
> consumer groups, delegation tokens, preferred leader election, partition
> reassignment, etc.) where the latter is pretty specialised. For each of the
> resources, you may have 3-4 operations, it will get confusing fast. Also,
> do you really think an API that has one level of grouping will mean that
> users have to "dig through our hierarchy"? Or are you concerned that once
> we go in that direction, there is a danger of making the hierarchy more
> complicated?
>
> Finally, I am not sure I would use the consumer as an example of something
> that is easy to grok. :) The fact that methods behave pretty differently
> (some are blocking while others only have an effect after poll) with no
> indication from the type signature or naming convention makes it harder,
> not easier, to understand.
>
> Ismael
>


Re: [DISCUSS] KIP-117: Add a public AdministrativeClient API for Kafka admin operations

2017-02-07 Thread Jay Kreps
Hey Colin,

This is great, thought I'd throw out a couple of opinions to the mix, feel
free to ignore:

   - I think it would be good to not use "get" as the prefix for things
   making remote calls. We've tried to avoid the java getter convention
   entirely (see code style guide), but for remote calls in particular it kind
   of blurs the line between field access and remote RPC in a way that leads
   people to trouble. What about, e.g., fetchAllGroups() vs getAllGroups().
   - I think futures and callbacks are a bit of a pain to use. I'd second
   Becket's comment: let's ensure there a common use case motivating these
   that wouldn't be just as easily satisfied with batch operations (which we
   seem to have at least for some things). In terms of flexibility Callbacks >
   Futures > Batch Ops but I think in terms of usability it is the exact
   opposite so let's make sure we have worked out how the API will be used
   before deciding. In particular I think java Futures are often an
   uncomfortable half-way point since calling get() and blocking the thread is
   often not what you want for chaining sequences of operations in a truly
   async way, so 99% of people just use the future as a way to batch calls.
   - Personally I don't think splitting the admin methods up actually makes
   things more usable. It just makes you have to dig through our hierarchy. I
   think a flat class with a bunch of operations (like the consumer api) is
   probably the easiest for people to grok and find things on. I am kind of a
   dumb PHP programmer at heart, though.

-Jay

On Tue, Feb 7, 2017 at 10:19 AM, Colin McCabe  wrote:

> On Tue, Feb 7, 2017, at 08:37, Ismael Juma wrote:
> > Hi all,
> >
> > I think it's good that we have discussed a number of API that would make
> > sense in the AdminClient. Having done that, I think we should now narrow
> > the focus of this KIP to a small set of methods to get us started. Once
> > we
> > have an AdminClient in the codebase, we can propose subsequent KIPs to
> > enrich it. I would suggest the following:
> >
> > 1. Let's focus on topic management operations: describe, create, alter
> > and
> > delete topics.
> > 2. Let's add an @Unstable annotation to the class and specify in the
> > javadoc that the methods are subject to change (if necessary).
> >
> > Thoughts?
>
> +1.
>
> I'm going to reorganize the proposal a bit to use futures and to have
> the grouping by API type proposed earlier.
>
> best,
> Colin
>
> >
> > Ismael
> >
> > On Fri, Feb 3, 2017 at 6:24 PM, Colin McCabe  wrote:
> >
> > > On Thu, Feb 2, 2017, at 21:45, Becket Qin wrote:
> > > > Hi Colin,
> > > >
> > > > Thanks for the KIP. An admin client is probably a must after we block
> > > > direct access to ZK. Some comments and thoughts below:
> > > >
> > > > 1. Do we have a clear scope for the admin client? It might be worth
> > > > thinking about the entire user experience of the admin client.
> Ideally we
> > > > may want to have a single client to do all the administrative work
> > > > instead
> > > > of having multiple ways to do different things. For example, do we
> want
> > > > to
> > > > add topic configurations change API in the admin client? What about
> > > > partition movements and preferred leader election? Those are also
> > > > administrative tasks which seem reasonable to be integrated into the
> > > > admin
> > > > client.
> > >
> > > Thanks for the comments, Becket!
> > >
> > > I agree that topic configuration change should be in the administrative
> > > client.  I have not thought about partition movement or preferred
> leader
> > > election.  It probably makes sense to put them in the client as well,
> > > but we should probably have a longer discussion about those features
> > > later when someone is ready to implement them ;)
> > >
> > > best,
> > > Colin
>


Re: [ANNOUNCE] New committer: Grant Henke

2017-01-11 Thread Jay Kreps
Congrats Grant!

-Jay

On Wed, Jan 11, 2017 at 11:51 AM, Gwen Shapira  wrote:

> The PMC for Apache Kafka has invited Grant Henke to join as a
> committer and we are pleased to announce that he has accepted!
>
> Grant contributed 88 patches, 90 code reviews, countless great
> comments on discussions, a much-needed cleanup to our protocol and the
> on-going and critical work on the Admin protocol. Throughout this, he
> displayed great technical judgment, high-quality work and willingness
> to contribute where needed to make Apache Kafka awesome.
>
> Thank you for your contributions, Grant :)
>
> --
> Gwen Shapira
> Product Manager | Confluent
> 650.450.2760 | @gwenshap
> Follow us: Twitter | blog
>


Re: [VOTE] KIP-106 - Default unclean.leader.election.enabled True => False

2017-01-11 Thread Jay Kreps
+1

On Wed, Jan 11, 2017 at 10:56 AM, Ben Stopford  wrote:

> Looks like there was a good consensus on the discuss thread for KIP-106 so
> lets move to a vote.
>
> Please chime in if you would like to change the default for
> unclean.leader.election.enabled from true to false.
>
> https://cwiki.apache.org/confluence/display/KAFKA/%
> 5BWIP%5D+KIP-106+-+Change+Default+unclean.leader.
> election.enabled+from+True+to+False
>
> B
>


Re: [DISCUSS] KIP-105: Addition of Record Level for Sensors

2017-01-05 Thread Jay Kreps
This is great! A couple of quick comments:

   1. It'd be good to make the motivation a bit more clear. I think the
   motivation is "We want to have lots of partition/task/etc metrics but we're
   concerned about the performance impact so we want to disable them by
   default." Currently the motivation section is more about the proposed
   change and doesn't really make clear why.
   2. Do we have a microbenchmark that shows that the performance of (1)
   enabled metrics, (2) disabled metrics, (3) no metrics? This would help
   build the case for needing this extra knob. Obviously if metrics are cheap
   you would always just leave them enabled and not worry about it. I think
   there should be some cost because we are at least taking a lock during the
   recording but I'm not sure how material that is.
   3. One consideration in how this exposed: we always found the ability to
   dynamically change the logging level in JMX for log4j pretty useful. I
   think if we want to leave the door open to add this ability to enable
   metrics at runtime it may have some impact on the decision around how
   metrics are registered/reported.

-Jay

On Thu, Jan 5, 2017 at 9:59 AM, Guozhang Wang  wrote:

> I thought about "not registering at all" and left a comment on the PR as
> well regarding this idea. My concern is that it may be not very
> straight-forward to implement though via the MetricsReporter interface, if
> Eno and Aarti has a good approach to tackle it I would love it.
>
>
> Guozhang
>
> On Thu, Jan 5, 2017 at 5:34 AM, Eno Thereska 
> wrote:
>
> > Updated KIP for 1. Waiting to hear from Guozhang on 2 and then we can
> > proceed.
> >
> > Thanks
> > Eno
> > > On 5 Jan 2017, at 12:27, Ismael Juma  wrote:
> > >
> > > Thanks Eno. It would be good to update the KIP as well with regards to
> 1.
> > >
> > > About 2, I am not sure how adding a field could break existing tools.
> > > Having said that, your suggestion not to register sensors at all if
> their
> > > record level is below what is configured works for me.
> > >
> > > Ismael
> > >
> > > On Thu, Jan 5, 2017 at 12:07 PM, Eno Thereska 
> > > wrote:
> > >
> > >> Thanks Ismael. Already addressed 1. in the PR.
> > >>
> > >> As for 2. I'd prefer not adding extra info to the metrics reporters at
> > >> this point, since it might break existing tools out there (e.g., if we
> > add
> > >> things like configured level). Existing tools might be expecting the
> > info
> > >> to be reported in a particular format.
> > >>
> > >> If the current way is confusing, I think the next best alternative is
> to
> > >> not register sensors at all if their record level is below what is
> > >> configured. That way they don't show up at all. This will require some
> > more
> > >> code in Sensors.java to check at every step, but I think it's clean
> from
> > >> the user's point of view.
> > >>
> > >> Eno
> > >>
> > >>
> > >>> On 5 Jan 2017, at 11:23, Ismael Juma  wrote:
> > >>>
> > >>> Thanks for the KIP, it seems like a good improvement. A couple of
> > >> comments:
> > >>>
> > >>> 1. As Jun asked in the PR, do we need a broker config as well? The
> > broker
> > >>> uses Kafka Metrics for some metrics, but we probably don't have any
> > debug
> > >>> sensors at the broker yet. Either way, it would be good to describe
> the
> > >>> thinking around this.
> > >>>
> > >>> 2. The behaviour with regards to the metrics reporter could be
> > >> surprising.
> > >>> It would be good to elaborate a little more on this aspect. For
> > example,
> > >>> maybe we want to expose the sensor level and the current configured
> > level
> > >>> to metric reporters. That could then be used to build the debug/info
> > >>> dashboard that Eno mentioned (while making it clear that some metrics
> > >>> exist, but are not currently being recorded).
> > >>>
> > >>> Ismael
> > >>>
> > >>> On Thu, Jan 5, 2017 at 10:37 AM, Eno Thereska <
> eno.there...@gmail.com>
> > >>> wrote:
> > >>>
> >  Correct on 2. Guozhang: the sensor will be registered and polled by
> a
> >  reporter, but the metrics associated with it will not be updated.
> > 
> >  That would allow a user to have, for example, a debug dashboard and
> an
> >  info dashboard.
> > 
> >  Updated KIP to make this clear.
> > 
> >  Thanks
> >  Eno
> > 
> > 
> > > On 4 Jan 2017, at 18:00, Aarti Gupta 
> wrote:
> > >
> > > Thanks for the review, Guozhang,
> > >
> > > Addressed 2 out of the three comments,
> > >
> > > 1. Fixed and updated KIP (swapped code variable name
> > > METRICS_RECORD_LEVEL_CONFIG with config name metrics.record.level)
> > >
> > > 3. >>Could you elaborate on the "shouldRecord()" function, e.g.
> which
> >  class
> > > it will be added to? Does it contain any parameters?
> > >
> > > Added more details on 

Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-01-04 Thread Jay Kreps
Hey Jun,

We had a proposal like this previously. The suppression scheme was slightly
different. Rather than than attempting to recopy or swap, there was instead
an aborted offset index maintained along with each segment containing a
sequential list of aborted offsets. The filtering would happen at fetch
time and would just ensure that fetch requests never span an aborted
transaction. That is, if you did a fetch request which would include
offsets 7,8,9,10,11, but offsets 7 and 10 appears in the index of aborted
transactions, then the fetch would return 8,9 only even if there was more
space in the fetch response. This leads to minimal overhead, but
potentially would give back smaller fetch responses if transactions are
being continually aborted.

One downside to this approach (both your proposal and the variation that I
just described is that it does not allow the possibility of consuming in
transaction commit order. Consuming in transaction commit order means that
the only delay you incur is the delay in committing a given transaction.
Consuming in offset order means you cannot consume a given offset until ALL
previously begun transactions are committed or aborted. KIP-98 doesn't
propose making this change now, but since it is consumer side it is
possible.

-Jay

On Tue, Jan 3, 2017 at 7:50 AM, Jun Rao  wrote:

> Just to follow up on Radai's idea of pushing the buffering logic to the
> broker. It may be possible to do this efficiently if we assume aborted
> transactions are rare. The following is a draft proposal. For each
> partition, the broker maintains the last stable offset (LSO) as described
> in the document, and only exposes messages up to this point if the reader
> is in the read-committed mode. When a new stable offset (NSO) is
> determined, if there is no aborted message in this window, the broker
> simply advances the LSO to the NSO. If there is at least one aborted
> message, the broker first replaces the current log segment with new log
> segments excluding the aborted messages and then advances the LSO. To make
> the replacement efficient, we can replace the current log segment with 3
> new segments: (1) a new "shadow" log segment that simply references the
> portion of the current log segment from the beginning to the LSO, (2) a log
> segment created by copying only committed messages between the LSO and the
> NSO, (3) a new "shadow" log segment that references the portion of the
> current log segment from the NSO (open ended). Note that only (2) involves
> real data copying. If aborted transactions are rare, this overhead will be
> insignificant. Assuming that applications typically don't abort
> transactions, transactions will only be aborted by transaction coordinators
> during hard failure of the producers, which should be rare.
>
> This way, the consumer library's logic will be simplified. We can still
> expose uncommitted messages to readers in the read-uncommitted mode and
> therefore leave the door open for speculative reader in the future.
>
> Thanks,
>
> Jun
>
>
> On Wed, Dec 21, 2016 at 10:44 AM, Apurva Mehta 
> wrote:
>
> > Hi Joel,
> >
> > The alternatives are embedded in the 'discussion' sections which are
> spread
> > throughout the google doc.
> >
> > Admittedly, we have not covered high level alternatives like those which
> > have been brought up in this thread. In particular, having a separate log
> > for transactional mesages and also having multiple producers participate
> in
> > a single transaction.
> >
> > This is an omission which we will correct.
> >
> > Thanks,
> > Apurva
> >
> > On Wed, Dec 21, 2016 at 10:34 AM, Joel Koshy 
> wrote:
> >
> > > >
> > > >
> > > > @Joel,
> > > >
> > > > I read over your wiki, and apart from the introduction of the notion
> of
> > > > journal partitions --whose pros and cons are already being
> discussed--
> > > you
> > > > also introduce the notion of a 'producer group' which enables
> multiple
> > > > producers to participate in a single transaction. This is completely
> > > > opposite of the model in the KIP where a transaction is defined by a
> > > > producer id, and hence there is a 1-1 mapping between producers and
> > > > transactions. Further, each producer can have exactly one in-flight
> > > > transaction at a time in the KIP.
> > > >
> > >
> > > Hi Apurva - yes I did notice those differences among other things :)
> > BTW, I
> > > haven't yet gone through the google-doc carefully but on a skim it does
> > not
> > > seem to contain any rejected alternatives as the wiki states.
> > >
> >
>


Re: [VOTE] KIP-90 Remove zkClient dependency from Streams

2016-12-20 Thread Jay Kreps
+1

On Tue, Dec 20, 2016 at 1:01 PM, Hojjat Jafarpour 
wrote:

> Hi all,
>
> Seems that there is no opposition to this KIP. This email is to start the
> voting for this KIP.
> Once again the KIP is for removing zkClient dependency from Streams. Please
> check out the KIP page:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 90+-+Remove+zkClient+
> dependency+from+Streams
>
> Thanks,
> --Hojjat
>


Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-20 Thread Jay Kreps
I don't think the simple approach of writing to a local store (in memory or
on disk) and then copying out to the destination topics would work but
there could well be more sophisticated things that would. As you say, it is
fine for the data to be un-replicated while you are accumulating the
transaction, because you can always just abort the transaction if that node
fails, but once you decided to commit and begin the process of copying out
data you must guarantee you eventually will copy out the full transaction.
If you have a non-durable store on one broker, and that broker crashes in
the middle of copying out the transaction to the destination brokers, if it
is possible that some of the writes have already succeeded, and the others
are now lost, then you would violate atomicity.

This is similar in classic two-phase commit protocols: a post-condition of
a successful prepare commit is a promise that the transaction will
eventually be successfully committed if requested so full durability is
required in the pre-commit phase.

But the flaw in the simple approach doesn't mean there isn't some less
obvious solution that hasn't been thought of yet.

For latency, yeah you're exactly right. We're assuming the latency of
transactions can be pushed down to almost the duration of the transaction
and obviously it can't be less than that. Let me try to flesh out the
motivation for caring about latency (I think Sriram touched on this):

   - We're primarily motivated by uses that fit a generalized notion of
   correct, stateful stream processing. That is you consume/process/produce
   potentially with associated local state in the processing. This fits KS and
   Samza, but potentially a whole world of things that do transformation of
   data. I think this is a really general notion of stream processing as a
   kind of "protocol" and the proposed semantics give a kind of "closure" to
   Kafka's producer and consumer protocols so they can be correctly chained.
   - These use cases end up being a kind of DAG of transformations, often
   even a fairly simple flow will have a depth of 5 stages and more realistic
   flows can be more like 10.
   - The transaction size is proportional to the efficiency since the
   overhead of the transaction is fixed irrespective of the number of
   messages. A transaction with two messages will be extremely inefficient,
   but one with a few thousand should be much better. So you can't comfortably
   make the transactions too small but yes you probably wouldn't need them to
   be multisecond.
   - The latency of the transactions stack up with the stages in the DAG in
   a naive usage. Say you commit every 100ms, if you have 10 stages your
   latency is going to be 1 second.
   - This latency is definitely a concern in many domains. This is why we
   are interested in having the option of supporting speculative execution.
   For speculative execution you assume likely processes won't fail and you go
   ahead and compute downstream results but co-ordinate the commit. This
   trades more work rolling back when there are failures for lower latency.
   This lets you push the end-to-end latency closer to 100ms rather than the
   100ms*num_stages.

Hopefully that gives a bit more color on the latency concern and desire for
"read uncommitted".

-Jay

On Tue, Dec 20, 2016 at 10:33 AM, radai <radai.rosenbl...@gmail.com> wrote:

> obviously anything committed would need to be replicated to all followers -
> just like current msgs.
>
> what im trying to say is that in-flight data (written as part of an ongoing
> TX and not committed yet) does not necessarily need to be replicated, or
> even written out to disk. taken to the extreme it means i can buffer in
> memory on the leader alone and incur no extra writes at all.
>
> if you dont want to just buffer in-memory on the leader (or are forced to
> spool to disk because of size) you could still avoid a double write by
> messing around with segment files (so the TX file becomes part of the
> "linked-list" of segment files instead of reading it and appending it's
> contents verbatim to the current segment file).
>
> the area when this does inevitably come short is latency and "read
> uncommitted" (which are related). the added delay (after cutting all the
> corners above) would really be the "time span" of a TX - the amount of time
> from the moment the producer started the TX to the time when it was
> committed. in my mind this time span is very short. am I failing to
> understand the proposed "typical" use case? is the plan to use long-running
> transactions and only commit at, say, 5 minute "checkpoints" ?
>
> On Tue, Dec 20, 2016 at 10:00 AM, Jay Kreps <j...@confluent.io> wrote:
>
> > Cool. It sounds like you guys will sync up and come up with a specific
> > proposal. I thin

Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-20 Thread Jay Kreps
; > > was that it will void the offset ordering and enforce people to accept
> > > "transaction ordering", that is, consumer will not see messages from
> the
> > > same partition in the order where they were produced, but only in the
> > order
> > > of when the corresponding transaction was committed. For some
> scenarios,
> > we
> > > believe that offset ordering would still be preferred than transaction
> > > ordering and that is why in KIP-98 proposal we default to the former
> > while
> > > leave the door open if users want to switch to the latter case.
> > >
> > >
> > > Guozhang
> > >
> > > On Mon, Dec 19, 2016 at 10:56 AM, Jay Kreps <j...@confluent.io> wrote:
> > >
> > > > Hey Radai,
> > > >
> > > > I'm not sure if I fully understand what you are proposing, but I
> > > > interpreted it to be similar to a proposal we worked through back at
> > > > LinkedIn. The proposal was to commit to a central txlog topic, and
> then
> > > > recopy to the destination topic upon transaction commit. The
> > observation
> > > on
> > > > that approach at the time were the following:
> > > >
> > > >1. It is cleaner since the output topics have only committed data!
> > > >2. You need full replication on the txlog topic to ensure
> atomicity.
> > > We
> > > >weren't able to come up with a solution where you buffer in memory
> > or
> > > > use
> > > >renaming tricks the way you are describing. The reason is that
> once
> > > you
> > > >begin committing you must ensure that the commit eventually
> succeeds
> > > to
> > > >guarantee atomicity. If you use a transient store you might commit
> > > some
> > > >data and then have a server failure that causes you to lose the
> rest
> > > of
> > > > the
> > > >transaction.
> > > >3. Having a single log allows the reader to choose a "read
> > > uncommitted"
> > > >mode that hands out messages immediately. This is important for
> > cases
> > > > where
> > > >latency is important, especially for stream processing topologies
> > > where
> > > >these latencies stack up across multiple stages.
> > > >
> > > > For the stream processing use case, item (2) is a bit of a deal
> killer.
> > > > This takes the cost of a transient message write (say the
> intermediate
> > > > result of a stream processing topology) from 3x writes (assuming 3x
> > > > replication) to 6x writes. This means you basically can't default it
> > on.
> > > If
> > > > we can in fact get the cost down to a single buffered write (i.e. 1x
> > the
> > > > data is written to memory and buffered to disk if the transaction is
> > > large)
> > > > as in the KIP-98 proposal without too many other negative side
> effects
> > I
> > > > think that could be compelling.
> > > >
> > > > -Jay
> > > >
> > > >
> > > >
> > > > On Mon, Dec 19, 2016 at 9:36 AM, radai <radai.rosenbl...@gmail.com>
> > > wrote:
> > > >
> > > > > regarding efficiency:
> > > > >
> > > > > I'd like to distinguish between server efficiency (resource
> > utilization
> > > > of
> > > > > the broker machine alone) and overall network efficiency (resource
> > > > > utilization on brokers, producers and consumers, including network
> > > > > traffic).
> > > > > my proposal is not as resource-efficient on the broker (although it
> > can
> > > > be,
> > > > > depends on a few trade offs and implementation details). HOWEVER,
> if
> > i
> > > > look
> > > > > at the overall efficiency:
> > > > >
> > > > >1.clients would need to either buffer or double-read uncommitted
> > > msgs.
> > > > > for N clients reading the stream M times (after re-starts and
> > > reconsumes)
> > > > > this would mean a M*N factor in either network BW or disk/memory
> > space
> > > > > (depends on if buffer vs re-read). potentially N*M more broker-side
> > > reads
> > > > > too.
> > > > >2 to reduce the broker side cost several things can be done
&g

Re: [VOTE] KIP-87 - Add Compaction Tombstone Flag

2016-12-19 Thread Jay Kreps
Makes sense!

-Jay

On Mon, Dec 19, 2016 at 2:40 PM, Michael Pearce <michael.pea...@ig.com>
wrote:

> Wow just read that def over tired. Hopefully it makes sense. Or you get
> the gist at least.
>
> 
> From: Michael Pearce <michael.pea...@ig.com>
> Sent: Monday, December 19, 2016 9:19:02 PM
> To: dev@kafka.apache.org
> Subject: Re: [VOTE] KIP-87 - Add Compaction Tombstone Flag
>
> Hi Jay,
>
> Agreed this stemmed as offshoot from KIP-82.
>
> Which our main driver for was to be able to have some headers for a null
> value as such for our routing, audit, tracing and a few other bits which
> currently we are forced to do with a message wrapper, if we all agreed on
> KIP-82 that we need native headers and look to implement that the push for
> this would dissipate.
>
> This KIP would allow for though one use case that comes to mind we could
> see which is to have business data with a delete. Though as said this isn't
> something we are pushing for think really we would have.
>
> As such in summary yes, if you want to fully support KIP-82 and we can get
> that agreed in principle and a target release version, I think quite a few
> guys at LinkedIn are quite pro it too ;) I'm happy to drop this one.
>
> Cheers
> Mike
>
> Sent using OWA for iPhone
> 
> From: Jay Kreps <j...@confluent.io>
> Sent: Monday, December 19, 2016 8:51:23 PM
> To: dev@kafka.apache.org
> Subject: Re: [VOTE] KIP-87 - Add Compaction Tombstone Flag
>
> Hey Michael,
>
> Here is the compatibility concern I have:
>
>1. You have a consumer app that relies on value == null to indicate a
>delete (current semantics).
>2. You upgrade Kafka and your clients.
>3. Some producer starts using the tombstone field in combination with
>non-null.
>
> I share Ismael's dislike of setting tombstones on records with null values.
> This makes sense as a transitional state, but as an end state its a bit
> weird. You'd expect to be able to mix null values and tombstones, and have
> the null values remain and the tombstones get compacted. However what will
> happen is both will be compacted and upon debugging this you'll learn that
> we sometimes use null in the value to indicate tombstone. Ismael's solution
> is a bigger compatibility break, though, so not sure if that is better.
>
> My other question is the relationship to KIP-82. My read is that this KIP
> solves some but not all of the problems KIP-82 is intended for. KIP-82, on
> the other hand, seems to address most of the motivating uses for this KIP.
> The exception is maybe item (5) on the list where you want to simultaneous
> delete and convey some information to subscribers, but I couldn't construct
> a concrete examples for that one. Do we need to rationalize these two KIPs?
> That is, do you still advocate this proposal if we do KIP-82 and vice
> versa? As you may have noticed I'm somewhat emotionally invested in the
> simplicity of the core data model, so my default position is let's try to
> avoid stuffing more stuff in, but if we have to add stuff I like each of
> these individually more than doing both. :-)
>
> -Jay
>
>
>
>
> On Fri, Dec 16, 2016 at 12:16 PM, Michael Pearce <michael.pea...@ig.com>
> wrote:
>
> > Hi Jay
> >
> > I disagree here that we are breaking any compatibility, we went through
> > this on the discussion thread in fact with the help of that thread is how
> > the kip came to the solution.
> >
> > Also on the supported combinations front you mention, we are not taking
> > anything away afaik.
> >
> > Currently supported are only are:
> > Null value = delete
> > Non-null value = non delete
> >
> > With this kip we would support
> > Null value + tombstone = delete
> > Non null value + tombstone = delete
> > Non null value + no tombstone = non delete
> >
> > As for the alternative idea, this is simply a new policy, how can there
> be
> > confusion here? For this policy it would be explicit that tombstone
> marker
> > would need to be set for a delete.
> >
> > I'm going to vent a little now as starting to get quite frustrated.
> >
> > We are going round in circles on kip-82 as per use cases there is now
> many
> > use cases, how many more are needed? just because confluent don't see
> these
> > doesn't mean they aren't real use cases other have, this is the point of
> > the Apache foundation, it shouldn't be the view of just one organisation.
> > It really is getting a feeling of the NIH syndrome. Rather than it being
> > constructive on discussi

Re: [VOTE] KIP-87 - Add Compaction Tombstone Flag

2016-12-19 Thread Jay Kreps
Hey Michael,

Here is the compatibility concern I have:

   1. You have a consumer app that relies on value == null to indicate a
   delete (current semantics).
   2. You upgrade Kafka and your clients.
   3. Some producer starts using the tombstone field in combination with
   non-null.

I share Ismael's dislike of setting tombstones on records with null values.
This makes sense as a transitional state, but as an end state its a bit
weird. You'd expect to be able to mix null values and tombstones, and have
the null values remain and the tombstones get compacted. However what will
happen is both will be compacted and upon debugging this you'll learn that
we sometimes use null in the value to indicate tombstone. Ismael's solution
is a bigger compatibility break, though, so not sure if that is better.

My other question is the relationship to KIP-82. My read is that this KIP
solves some but not all of the problems KIP-82 is intended for. KIP-82, on
the other hand, seems to address most of the motivating uses for this KIP.
The exception is maybe item (5) on the list where you want to simultaneous
delete and convey some information to subscribers, but I couldn't construct
a concrete examples for that one. Do we need to rationalize these two KIPs?
That is, do you still advocate this proposal if we do KIP-82 and vice
versa? As you may have noticed I'm somewhat emotionally invested in the
simplicity of the core data model, so my default position is let's try to
avoid stuffing more stuff in, but if we have to add stuff I like each of
these individually more than doing both. :-)

-Jay




On Fri, Dec 16, 2016 at 12:16 PM, Michael Pearce <michael.pea...@ig.com>
wrote:

> Hi Jay
>
> I disagree here that we are breaking any compatibility, we went through
> this on the discussion thread in fact with the help of that thread is how
> the kip came to the solution.
>
> Also on the supported combinations front you mention, we are not taking
> anything away afaik.
>
> Currently supported are only are:
> Null value = delete
> Non-null value = non delete
>
> With this kip we would support
> Null value + tombstone = delete
> Non null value + tombstone = delete
> Non null value + no tombstone = non delete
>
> As for the alternative idea, this is simply a new policy, how can there be
> confusion here? For this policy it would be explicit that tombstone marker
> would need to be set for a delete.
>
> I'm going to vent a little now as starting to get quite frustrated.
>
> We are going round in circles on kip-82 as per use cases there is now many
> use cases, how many more are needed? just because confluent don't see these
> doesn't mean they aren't real use cases other have, this is the point of
> the Apache foundation, it shouldn't be the view of just one organisation.
> It really is getting a feeling of the NIH syndrome. Rather than it being
> constructive on discussion of the implementation detail.
>
> kip-87 spawned from as on the kip call we all agreed this was needed. And
> would at least allow a custom wrapper be supported in a compacted topic,
> allowing meta data. Which again now I feel we are spinning wheels, and
> simply finding reasons not support it.
>
> Cheers
> Mike
>
>
>
> Sent using OWA for iPad
> 
> From: Jay Kreps <j...@confluent.io>
> Sent: Friday, December 16, 2016 7:09:23 PM
> To: dev@kafka.apache.org
> Subject: Re: [VOTE] KIP-87 - Add Compaction Tombstone Flag
>
> Hey Michael,
>
> I do think it might have been better had we started with a separate concept
> of null vs delete. Given that we are where we are, I'm not sure that the
> possible cures we explored so far are better than the disease.
>
> I apologize for coming to this late, but I didn't really understand the
> implications of the KIP and that we'd be breaking compatibility with
> existing apps until the vote had begun, and in my defense I'm not sure the
> other folks voting did either.
>
> I think we all agree there are many existing apps that are built with the
> assumption of "null value non-tombstone" and it isn't possible to
> disambiguate these from tombstones on the producer. It isn't that anyone is
> saying we have to support all four possibilities at once, it's that we
> simply can't orphan one of the existing combinations or our users will eat
> us!
>
> If I've understood your alternate solution of adding another setting for
> compaction, I think this does fix the compatibility problem, but it adds an
> odd mode the user has to add on all their topics. While the current state
> is easily explainable, the resulting state where the meaning of tombstone
> and null are overlapping and ambiguous and dependent on a compaction
> setting that could change out of band or not 

Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-19 Thread Jay Kreps
less the consumer
> > consumes from all the partitions involved in a transactions, it seems
> > impossible for it to deliver *all* the messages in a transaction, right?
> A
> > weaker guarantee is we will deliver all or none of the messages that
> belong
> > to the same transaction in ONE partition, but this would be different
> from
> > the guarantee from the producer side.
> >
> > My two cents on Radai's sideways partition design:
> > 1. If we consider the producer side behavior as doing a two phase commit
> > which including the committing the consumer offsets, it is a little
> awkward
> > that we allow uncommitted message goes into the main log and rely on the
> > consumer to filter out. So semantic wise I think it would be better if we
> > can avoid this. Radai's suggestion is actually intuitive because if the
> > brokers do not want to expose uncommitted transactions to the consumer,
> the
> > brokers have to buffer it.
> >
> > 2. Regarding the efficiency. I think may be it worth looking at the
> > efficiency cost v.s benefit. The efficiency includes both server side
> > efficiency and consumer side efficiency.
> >
> > Regarding the server side efficiency, the current proposal would probably
> > have better efficiency regardless of whether something goes wrong.
> Radai's
> > suggestion would put more burden on the server side. If nothing goes
> wrong
> > we always pay the cost of having double copy of the transactional
> messages
> > and do not get the semantic benefit. But if something goes wrong, the
> > efficiency cost we pay we get us a better semantic.
> >
> > For the consumer side efficiency, because there is no need to buffer the
> > uncommitted messages. The current proposal may have to potentially buffer
> > uncommitted messages so it would be less efficient than Radai's
> suggestion
> > when a transaction aborts. When everything goes well, both design seems
> > having the similar performance. However, it depends on whether we are
> > willing to loosen the consumer side transaction guarantee that I
> mentioned
> > earlier to Apurva.
> >
> > Currently the biggest pressure on the consumer side is that it has to
> > buffer incomplete transactions. There are two reasons for it,
> > A. A transaction may be aborted so we cannot expose the messages to the
> > users.
> > B. We want to return all or none of the messages in a transaction in ONE
> > partition.
> >
> > While reason A is mandatory, I think reason B may be discussable. Radai's
> > design actually removes reason A because there is no uncommitted messages
> > exposed to the consumers. This may potentially give us a chance to
> > significantly improve consumer side efficiency in normal cases. It again
> > depends on the use case, i.e. whether user can process a transaction
> > progressively (message by message) or it has to be buffered and returned
> > all together. If in most cases, users can process the transactions
> message
> > by message (most stream processing tasks probably can do so), then with
> > Radai's proposal we don't need to buffer the transactions for the users
> > anymore, which is a big difference. For the latter case, the consumer may
> > have to buffer the incomplete transactions otherwise we are just throwing
> > the burden onto the users.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Fri, Dec 16, 2016 at 4:56 PM, Jay Kreps <j...@confluent.io> wrote:
> >
> > > Yeah good point. I relent!
> > >
> > > -jay
> > >
> > > On Fri, Dec 16, 2016 at 1:46 PM Jason Gustafson <ja...@confluent.io>
> > > wrote:
> > >
> > > > Jay/Ismael,
> > > >
> > > >
> > > >
> > > > I agree that lazy initialization of metadata seems unavoidable.
> > Ideally,
> > > we
> > > >
> > > > could follow the same pattern for transactions, but remember that in
> > the
> > > >
> > > > consumer+producer use case, the initialization needs to be completed
> > > prior
> > > >
> > > > to setting the consumer's position. Otherwise we risk reading stale
> > > >
> > > > offsets. But it would be pretty awkward if you have to begin a
> > > transaction
> > > >
> > > > first to ensure that your consumer can read the right offset from the
> > > >
> > > > consumer, right? It's a bit easier to explain that you should always
> > call
> > > >
>

Re: [DISCUSS] KIP-102 - Add close with timeout for consumers

2016-12-16 Thread Jay Kreps
I think this is great. Sounds like one implication is that existing code
that called close() and hit the timeout would now hang indefinitely. We saw
this kind of thing a lot in automated testing scenarios where people don't
correctly sequence their shutdown of client and server. I think this is
okay, but might be good to include in the release notes.

-jay

On Thu, Dec 15, 2016 at 5:32 AM, Rajini Sivaram  wrote:

Hi all,





I have just created KIP-102 to add a new close method for consumers with a


timeout parameter, making Consumer consistent with Producer:





https://cwiki.apache.org/confluence/display/KAFKA/KIP-102+-+Add+close+with+timeout+for+consumers





Comments and suggestions are welcome.





Thank you...





Regards,





Rajini


Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-16 Thread Jay Kreps
Yeah good point. I relent!

-jay

On Fri, Dec 16, 2016 at 1:46 PM Jason Gustafson  wrote:

> Jay/Ismael,
>
>
>
> I agree that lazy initialization of metadata seems unavoidable. Ideally, we
>
> could follow the same pattern for transactions, but remember that in the
>
> consumer+producer use case, the initialization needs to be completed prior
>
> to setting the consumer's position. Otherwise we risk reading stale
>
> offsets. But it would be pretty awkward if you have to begin a transaction
>
> first to ensure that your consumer can read the right offset from the
>
> consumer, right? It's a bit easier to explain that you should always call
>
> `producer.init()` prior to initializing the consumer. Users would probably
>
> get this right without any special effort.
>
>
>
> -Jason
>
>
>
> On Wed, Dec 14, 2016 at 1:52 AM, Rajini Sivaram 
> wrote:
>
>
>
> > Hi Apurva,
>
> >
>
> > Thank you for the answers. Just one follow-on.
>
> >
>
> > 15. Let me rephrase my original question. If all control messages
> (messages
>
> > to transaction logs and markers on user logs) were acknowledged only
> after
>
> > flushing the log segment, will transactions become durable in the
>
> > traditional sense (i.e. not restricted to min.insync.replicas failures) ?
>
> > This is not a suggestion to update the KIP. It seems to me that the
> design
>
> > enables full durability if required in the future with a rather
>
> > non-intrusive change. I just wanted to make sure I haven't missed
> anything
>
> > fundamental that prevents Kafka from doing this.
>
> >
>
> >
>
> >
>
> > On Wed, Dec 14, 2016 at 5:30 AM, Ben Kirwin  wrote:
>
> >
>
> > > Hi Apurva,
>
> > >
>
> > > Thanks for the detailed answers... and sorry for the late reply!
>
> > >
>
> > > It does sound like, if the input-partitions-to-app-id mapping never
>
> > > changes, the existing fencing mechanisms should prevent duplicates.
>
> > Great!
>
> > > I'm a bit concerned the proposed API will be delicate to program
> against
>
> > > successfully -- even in the simple case, we need to create a new
> producer
>
> > > instance per input partition, and anything fancier is going to need its
>
> > own
>
> > > implementation of the Streams/Samza-style 'task' idea -- but that may
> be
>
> > > fine for this sort of advanced feature.
>
> > >
>
> > > For the second question, I notice that Jason also elaborated on this
>
> > > downthread:
>
> > >
>
> > > > We also looked at removing the producer ID.
>
> > > > This was discussed somewhere above, but basically the idea is to
> store
>
> > > the
>
> > > > AppID in the message set header directly and avoid the mapping to
>
> > > producer
>
> > > > ID altogether. As long as batching isn't too bad, the impact on total
>
> > > size
>
> > > > may not be too bad, but we were ultimately more comfortable with a
>
> > fixed
>
> > > > size ID.
>
> > >
>
> > > ...which suggests that the distinction is useful for performance, but
> not
>
> > > necessary for correctness, which makes good sense to me. (Would a
> 128-bid
>
> > > ID be a reasonable compromise? That's enough room for a UUID, or a
>
> > > reasonable hash of an arbitrary string, and has only a marginal
> increase
>
> > on
>
> > > the message size.)
>
> > >
>
> > > On Tue, Dec 6, 2016 at 11:44 PM, Apurva Mehta 
>
> > wrote:
>
> > >
>
> > > > Hi Ben,
>
> > > >
>
> > > > Now, on to your first question of how deal with consumer rebalances.
>
> > The
>
> > > > short answer is that the application needs to ensure that the the
>
> > > > assignment of input partitions to appId is consistent across
>
> > rebalances.
>
> > > >
>
> > > > For Kafka streams, they already ensure that the mapping of input
>
> > > partitions
>
> > > > to task Id is invariant across rebalances by implementing a custom
>
> > sticky
>
> > > > assignor. Other non-streams apps can trivially have one producer per
>
> > > input
>
> > > > partition and have the appId be the same as the partition number to
>
> > > achieve
>
> > > > the same effect.
>
> > > >
>
> > > > With this precondition in place, we can maintain transactions across
>
> > > > rebalances.
>
> > > >
>
> > > > Hope this answers your question.
>
> > > >
>
> > > > Thanks,
>
> > > > Apurva
>
> > > >
>
> > > > On Tue, Dec 6, 2016 at 3:22 PM, Ben Kirwin  wrote:
>
> > > >
>
> > > > > Thanks for this! I'm looking forward to going through the full
>
> > proposal
>
> > > > in
>
> > > > > detail soon; a few early questions:
>
> > > > >
>
> > > > > First: what happens when a consumer rebalances in the middle of a
>
> > > > > transaction? The full documentation suggests that such a
> transaction
>
> > > > ought
>
> > > > > to be rejected:
>
> > > > >
>
> > > > > > [...] if a rebalance has happened and this consumer
>
> > > > > > instance becomes a zombie, even if this offset message is
> appended
>
> > in
>
> > > > the
>
> > > > > > offset topic, the transaction will be rejected later on 

Re: [VOTE] KIP-87 - Add Compaction Tombstone Flag

2016-12-16 Thread Jay Kreps
Hey Michael,

Sorry if it's frustrating. Definitely not a Confluent thing, I think you've
seen both sides from different Confluent folks. I'm just trying to
understand the compatibility implications to get on board myself. Given
you're saying there is no compatibility implication I think maybe I'm
confused with how the proposal works. Let me see if I can go back through
an un-confuse myself.

-Jay

On Fri, Dec 16, 2016 at 12:16 PM, Michael Pearce <michael.pea...@ig.com>
wrote:

> Hi Jay
>
> I disagree here that we are breaking any compatibility, we went through
> this on the discussion thread in fact with the help of that thread is how
> the kip came to the solution.
>
> Also on the supported combinations front you mention, we are not taking
> anything away afaik.
>
> Currently supported are only are:
> Null value = delete
> Non-null value = non delete
>
> With this kip we would support
> Null value + tombstone = delete
> Non null value + tombstone = delete
> Non null value + no tombstone = non delete
>
> As for the alternative idea, this is simply a new policy, how can there be
> confusion here? For this policy it would be explicit that tombstone marker
> would need to be set for a delete.
>
> I'm going to vent a little now as starting to get quite frustrated.
>
> We are going round in circles on kip-82 as per use cases there is now many
> use cases, how many more are needed? just because confluent don't see these
> doesn't mean they aren't real use cases other have, this is the point of
> the Apache foundation, it shouldn't be the view of just one organisation.
> It really is getting a feeling of the NIH syndrome. Rather than it being
> constructive on discussion of the implementation detail.
>
> kip-87 spawned from as on the kip call we all agreed this was needed. And
> would at least allow a custom wrapper be supported in a compacted topic,
> allowing meta data. Which again now I feel we are spinning wheels, and
> simply finding reasons not support it.
>
> Cheers
> Mike
>
>
>
> Sent using OWA for iPad
> 
> From: Jay Kreps <j...@confluent.io>
> Sent: Friday, December 16, 2016 7:09:23 PM
> To: dev@kafka.apache.org
> Subject: Re: [VOTE] KIP-87 - Add Compaction Tombstone Flag
>
> Hey Michael,
>
> I do think it might have been better had we started with a separate concept
> of null vs delete. Given that we are where we are, I'm not sure that the
> possible cures we explored so far are better than the disease.
>
> I apologize for coming to this late, but I didn't really understand the
> implications of the KIP and that we'd be breaking compatibility with
> existing apps until the vote had begun, and in my defense I'm not sure the
> other folks voting did either.
>
> I think we all agree there are many existing apps that are built with the
> assumption of "null value non-tombstone" and it isn't possible to
> disambiguate these from tombstones on the producer. It isn't that anyone is
> saying we have to support all four possibilities at once, it's that we
> simply can't orphan one of the existing combinations or our users will eat
> us!
>
> If I've understood your alternate solution of adding another setting for
> compaction, I think this does fix the compatibility problem, but it adds an
> odd mode the user has to add on all their topics. While the current state
> is easily explainable, the resulting state where the meaning of tombstone
> and null are overlapping and ambiguous and dependent on a compaction
> setting that could change out of band or not be in sync with your code in
> some environment seems worse to me then where we currently are. I think the
> question is how would this combination be explained to users and does it
> make sense?
>
> -Jay
>
>
>
> On Fri, Dec 16, 2016 at 9:25 AM, Michael Pearce <michael.pea...@ig.com>
> wrote:
>
> > Hi Chaps,
> >
> > Can we either get one more +1 binding (we have 2 already) on the
> existing?
> >
> > Or have some response on the below possible alternative. We are keen to
> > get working on this, so we make next feature release.
> >
> > Cheers
> > Mike
> >
> >
> > On 13/12/2016, 16:32, "Michael Pearce" <michael.pea...@ig.com> wrote:
> >
> > Hi Ismael
> >
> > Did you see our email this morning, what's your thoughts on this
> > approach to instead we simply have a brand new policy?
> >
> > Cheers
> > Mike
> >
> >
> > Sent using OWA for iPhone
> > 
> > From: isma...@gmail.com <isma...@gmail.com> on behalf of Ism

Re: [VOTE] KIP-87 - Add Compaction Tombstone Flag

2016-12-16 Thread Jay Kreps
uctor into the tombstone flag.
> >
> > However, if you want this change to more generally decouple the idea
> of
> > deletion and null values, then you are sometimes converting what
> might be a
> > completely valid null value that doesn't indicate deletion into a
> > tombstone. Downstream applications could potentially handle these
> cases
> > differently given the separation of deletion from value.
> >
> > I guess the question is if we want to try to support the latter even
> for
> > topics where we have older produce requests. An example where this
> could
> > come up is in something like a CDC Connector. If we try to support
> the
> > semantic difference, a connector might write changes to Kafka using
> the
> > tombstone flag to indicate when a row was truly deleted (vs an
> update that
> > sets it to null but still present; this probably makes more sense
> for CDC
> > from document stores or extracting single columns). There are various
> > reasons we might want to maintain the full log and not turn
> compaction on
> > (or just use a time-based retention policy), but downstream
> applications
> > might care to know the difference between a delete and a null value.
> In
> > fact both versions of the same log (compacted and time-retention)
> could be
> > useful and I don't think it'll be uncommon to maintain both or use
> KIP-71
> > to maintain a hybrid compacted/retention topic.
> >
> > -Ewen
> >
> > On Sun, Dec 11, 2016 at 1:18 PM, Michael Pearce <
> michael.pea...@ig.com>
> > wrote:
> >
> > > Hi Jay,
> > >
> > > Why wouldn't that work, the tombstone value is only looked at by
> the
> > > broker, on a topic configured for compaction as such is benign on
> non
> > > compacted topics. This is as much as sending a null value currently
> > >
> > >
> > > Regards
> > > Mike
> > >
> > >
> > >
> > > Sent using OWA for iPhone
> > > 
> > > From: Jay Kreps <j...@confluent.io>
> > > Sent: Sunday, December 11, 2016 8:58:53 PM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [VOTE] KIP-87 - Add Compaction Tombstone Flag
> > >
> > > Hey Michael,
> > >
> > > I'm not quite sure that works as that would translate ALL null
> values to
> > > tombstones, even for non-compacted topics that use null as an
> acceptable
> > > value sent by the producer and expected by the consumer.
> > >
> > > -Jay
> > >
> > > On Sun, Dec 11, 2016 at 3:26 AM, Michael Pearce <
> michael.pea...@ig.com>
> > > wrote:
> > >
> > > > Hi Ewen,
> > > >
> > > > I think the easiest way to show this is with code.
> > > >
> > > > As you can see we keep the existing behaviour for code/binaries
> calling
> > > > the pre-existing constructors, whereby if the value is null the
> > tombstone
> > > > is set to true.
> > > >
> > > > Regards
> > > > Mike
> > > >
> > > >
> > > >
> > > > /**
> > > >  * Creates a record with a specified timestamp to be sent to
> a
> > > > specified topic and partition
> > > >  *
> > > >  * @param topic The topic the record will be appended to
> > > >  * @param partition The partition to which the record should
> be
> > sent
> > > >  * @param timestamp The timestamp of the record
> > > >  * @param tombstone if the record should be treated as a
> tombstone
> > if
> > > > the topic is compacted
> > > >  * @param key The key that will be included in the record
> > > >  * @param value The record contents
> > > >  */
> > > > public ProducerRecord(String topic, Integer partition,
> Boolean
> > > > tombstone, Long timestamp, K key, V value) {
> > > > if (topic == null)
> > > > throw new IllegalArgumentException("Topic cannot be
> > null.");
> > > > if (timestamp != null

Re: [DISCUSS] KIP-90 Remove zkClient dependency from Streams

2016-12-13 Thread Jay Kreps
Ha, least controversial KIP ever. :-)

-Jay

On Tue, Dec 13, 2016 at 10:39 AM, Hojjat Jafarpour 
wrote:

> Hi all,
>
> The following is a KIP for removing zkClient dependency from Streams.
> Please check out the KIP page:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 90+-+Remove+zkClient+dependency+from+Streams
>
> Thanks,
> --Hojjat
>


Re: [DISCUSS] KIP-81: Max in-flight fetches

2016-12-13 Thread Jay Kreps
Hey Ismael,

Yeah I think we are both saying the same thing---removing only works if you
have a truly optimal strategy. Actually even dynamically computing a
reasonable default isn't totally obvious (do you set fetch.max.bytes to
equal buffer.memory to try to queue up as much data in the network buffers?
Do you try to limit it to your socket.receive.buffer size so that you can
read it in a single shot?).

Regarding what is being measured, my interpretation was the same as yours.
I was just adding to the previous point that buffer.memory setting would
not be a very close proxy for memory usage. Someone was pointing out that
compression would make this true, and I was just adding that even without
compression the object overhead would lead to a high expansion factor.

-Jay

On Mon, Dec 12, 2016 at 11:53 PM, Ismael Juma <ism...@juma.me.uk> wrote:

> Hi Jay,
>
> About `max.partition.fetch.bytes`, yes it was an oversight not to lower its
> priority as part of KIP-74 given the existence of `fetch.max.bytes` and the
> fact that we can now make progress in the presence of oversized messages
> independently of either of those settings.
>
> I agree that we should try to set those values automatically based on
> `buffer.memory`, but I am not sure if we can have a truly optimal strategy.
> So, I'd go with reducing the priority to "low" instead of removing
> `fetch.max.bytes` and `max.partition.fetch.bytes` altogether for now. If
> experience in the field tells us that the auto strategy is good enough, we
> can consider removing them (yes, I know, it's unlikely to happen as there
> won't be that much motivation then).
>
> Regarding the "conversion from packed bytes to java objects" comment, that
> raises the question: what are we actually measuring here? From the KIP,
> it's not too clear. My interpretation was that we were not measuring the
> memory usage of the Java objects. In that case, `buffer.memory` seems like
> a reasonable name although perhaps the user's expectation is that we would
> measure the memory usage of the Java objects?
>
> Ismael
>
> On Tue, Dec 13, 2016 at 6:21 AM, Jay Kreps <j...@confluent.io> wrote:
>
> > I think the question is whether we have a truly optimal strategy for
> > deriving the partition- and fetch-level configs from the global setting.
> If
> > we do then we should just get rid of them. If not, then if we can at
> least
> > derive usually good and never terrible settings from the global limit at
> > initialization time maybe we can set them automatically unless the user
> > overrides with an explicit conifg. Even the latter would let us mark it
> low
> > priority which at least takes it off the list of things you have to grok
> to
> > use the consumer which I suspect would be much appreciated by our poor
> > users.
> >
> > Regardless it'd be nice to make sure we get an explanation of the
> > relationships between the remaining memory configs in the KIP and in the
> > docs.
> >
> > I agree that buffer.memory isn't bad.
> >
> > -Jay
> >
> >
> > On Mon, Dec 12, 2016 at 2:56 PM, Jason Gustafson <ja...@confluent.io>
> > wrote:
> >
> > > Yeah, that's a good point. Perhaps in retrospect, it would have been
> > better
> > > to define `buffer.memory` first and let `fetch.max.bytes` be based off
> of
> > > it. I like `buffer.memory` since it gives the consumer nice symmetry
> with
> > > the producer and its generic naming gives us some flexibility
> internally
> > > with how we use it. We could still do that I guess, if we're willing to
> > > deprecate `fetch.max.bytes` (one release after adding it!).
> > >
> > > As for `max.partition.fetch.bytes`, it's noted in KIP-74 that it is
> still
> > > useful in Kafka Streams, but I agree it makes sense to lower its
> priority
> > > in favor of `fetch.max.bytes`.
> > >
> > > -Jason
> > >
> > > On Sat, Dec 10, 2016 at 2:27 PM, Jay Kreps <j...@confluent.io> wrote:
> > >
> > > > Jason, it's not just decompression but also the conversion from
> packed
> > > > bytes to java objects, right? That can be even larger than the
> > > > decompression blow up. I think this may be okay, the problem may just
> > be
> > > > that the naming is a bit misleading. In the producer you are
> literally
> > > > allocating a buffer of that size, so the name buffer.memory makes
> > sense.
> > > In
> > > > this case it is something more like max.bytes.read.per.poll.call
> > > (terrible
> > > > name, but maybe something like that?).
> > &g

Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-13 Thread Jay Kreps
Hey Ismael,

Yes, you are correct, I remember now why didn't do that. I rescind that
suggestion. I still think lazy initialization is more in keeping with what
we've done if feasible.

-Jay

On Mon, Dec 12, 2016 at 11:36 PM, Ismael Juma <ism...@juma.me.uk> wrote:

> Hi Jay,
>
> I like the idea of having a single `init`, but I am not sure about the
> specifics of the metadata initialisation (as Jason alluded to). More
> inline.
>
> On Tue, Dec 13, 2016 at 6:01 AM, Jay Kreps <j...@confluent.io> wrote:
>
> >1. Add a generic init() call which initializes both transactions and
> >metadata
> >
>
> Would this initialise metadata for all topics? One advantage of doing the
> metadata call during `send` is that we only retrieve metadata for the
> subset of topics that you are producing to. For large clusters, retrieving
> the metadata for all the topics is relatively expensive and I think users
> would prefer to avoid that unless there are some concrete benefits. We
> could pass the topics to `init`, but that seems a bit clunky.
>
>
> >2. If you don't call init(), metadata is initialized on the first send
> >(as now)
>
>
> We need to maintain the logic to refresh the metadata on `send` anyway if
> you try to send to a topic that is missing from the metadata (e.g. if it's
> added after the `init` method is called, assuming that we don't expect
> people to call `init` more than once) so that seems fine.
>
>
> > and transactions are lazily initialized at the first beginTransaction()
> > call.
>
>
> I'll leave it to Jason to say if this is feasible. However, if it is, it
> seems like we can just do this and avoid the `init` method altogether?
>
> Ismael
>
> On Tue, Dec 13, 2016 at 6:01 AM, Jay Kreps <j...@confluent.io> wrote:
>
> > Hey Jason/Neha,
> >
> > Yeah, clearly having a mandatory, generic init() method that initializes
> > both transactions and topic metadata would be the ideal solution. This
> > would solve the occasional complaint about blocking behavior during
> > initialization of metadata (or at least shift it to a new complaint about
> > an inability to initialize when the cluster isn't up in test
> environments).
> > The challenge is that we can't do this because it isn't backwards
> > compatible with existing apps that don't call init.
> >
> > The alternative of having an optional generic init() call is a bit odd
> > because to figure out if you need to call it you need to discover what it
> > does, which is not generic, it initializes transactions. We can't really
> > add more logic to init because it only gets invoked by transaction users
> so
> > it doesn't really function as a generic init.
> >
> > What do you think of this solution:
> >
> >1. Add a generic init() call which initializes both transactions and
> >metadata
> >2. If you don't call init(), metadata is initialized on the first send
> >(as now) and transactions are lazily initialized at the first
> >beginTransaction() call.
> >
> > -Jay
> >
> >
> >
> >
> >
> >
> >
> > On Mon, Dec 12, 2016 at 9:17 PM, Jason Gustafson <ja...@confluent.io>
> > wrote:
> >
> > > @Neha
> > >
> > >
> > > 1. I think we should consider renaming initTransactions to just init()
> > and
> > > > moving the metadata initialization there. Let's make sure we don't
> add
> > > APIs
> > > > that are relevant to this proposal only. Instead, try to think what
> > we'd
> > > > propose if we were writing the producer from scratch today. I suspect
> > we
> > > > would end up with an init() API that would do the metadata
> > initialization
> > > > as well as the transaction stuff lazily. If so, let's make that
> change
> > > now.
> > >
> > >
> > > I think the only awkwardness with `init()` is that it would probably
> have
> > > to be an optional API for non-transactional usage to support existing
> > code.
> > > I'm also not sure what metadata we can actually initialize at that
> point
> > > since we don't know which topics will be produced to. That said, I'm
> also
> > > not fond of the `initTransactions` name, and we may find other uses
> for a
> > > generic `init()` in the future, so I'm in favor this renaming.
> > >
> > >
> > > > 2. Along the same lines, let's think about the role of each id that
> the
> > > > producer will have and see if everything still makes sense. For
> > i

Re: [DISCUSS] KIP-81: Max in-flight fetches

2016-12-12 Thread Jay Kreps
I think the question is whether we have a truly optimal strategy for
deriving the partition- and fetch-level configs from the global setting. If
we do then we should just get rid of them. If not, then if we can at least
derive usually good and never terrible settings from the global limit at
initialization time maybe we can set them automatically unless the user
overrides with an explicit conifg. Even the latter would let us mark it low
priority which at least takes it off the list of things you have to grok to
use the consumer which I suspect would be much appreciated by our poor
users.

Regardless it'd be nice to make sure we get an explanation of the
relationships between the remaining memory configs in the KIP and in the
docs.

I agree that buffer.memory isn't bad.

-Jay


On Mon, Dec 12, 2016 at 2:56 PM, Jason Gustafson <ja...@confluent.io> wrote:

> Yeah, that's a good point. Perhaps in retrospect, it would have been better
> to define `buffer.memory` first and let `fetch.max.bytes` be based off of
> it. I like `buffer.memory` since it gives the consumer nice symmetry with
> the producer and its generic naming gives us some flexibility internally
> with how we use it. We could still do that I guess, if we're willing to
> deprecate `fetch.max.bytes` (one release after adding it!).
>
> As for `max.partition.fetch.bytes`, it's noted in KIP-74 that it is still
> useful in Kafka Streams, but I agree it makes sense to lower its priority
> in favor of `fetch.max.bytes`.
>
> -Jason
>
> On Sat, Dec 10, 2016 at 2:27 PM, Jay Kreps <j...@confluent.io> wrote:
>
> > Jason, it's not just decompression but also the conversion from packed
> > bytes to java objects, right? That can be even larger than the
> > decompression blow up. I think this may be okay, the problem may just be
> > that the naming is a bit misleading. In the producer you are literally
> > allocating a buffer of that size, so the name buffer.memory makes sense.
> In
> > this case it is something more like max.bytes.read.per.poll.call
> (terrible
> > name, but maybe something like that?).
> >
> > Mickael, I'd second Jason's request for the default and expand on it. We
> > currently have several consumer-related memory
> > settings--max.partition.fetch.bytes, fetch.max.bytes. I don't think it
> is
> > clear today how to set these. For example we mark
> max.partition.fetch.bytes
> > as high importance and fetch.max.bytes as medium, but it seems like it
> > would be the other way around. Can we think this through from the point
> of
> > view of a lazy user? I.e. I have 64MB of space to use for my consumer, in
> > an ideal world I'd say, "hey consumer here is 64MB go use that as
> > efficiently as possible" and not have to tune a bunch of individual
> things
> > with complex relationships. Maybe one or both of the existing settings
> can
> > either be eliminated or at the least marked as low priority and we can
> > infer a reasonable default from the new config your introducing?
> >
> > -jay
> >
> > On Fri, Dec 9, 2016 at 2:08 PM, Jason Gustafson <ja...@confluent.io>
> > wrote:
> >
> > > Hi Mickael,
> > >
> > > I think the approach looks good, just a few minor questions:
> > >
> > > 1. The KIP doesn't say what the default value of `buffer.memory` will
> be.
> > > Looks like we use 50MB as the default for `fetch.max.bytes`, so perhaps
> > it
> > > makes sense to set the default based on that. Might also be worth
> > > mentioning somewhere the constraint between the two configs.
> > > 2. To clarify, this limit only affects the uncompressed size of the
> > fetched
> > > data, right? The consumer may still exceed it in order to store the
> > > decompressed record data. We delay decompression until the records are
> > > returned to the user, but because of max.poll.records, we may end up
> > > holding onto the decompressed data from a single partition for a few
> > > iterations. I think this is fine, but probably worth noting in the KIP.
> > > 3. Is there any risk using the MemoryPool that, after we fill up the
> > memory
> > > with fetch data, we can starve the coordinator's connection? Suppose,
> for
> > > example, that we send a bunch of pre-fetches right before returning to
> > the
> > > user. These fetches might return before the next call to poll(), in
> which
> > > case we might not have enough memory to receive heartbeats, which would
> > > block us from sending additional heartbeats until the next call to
> > poll().
> > > Not sure it's a big problem since heartbeats are t

Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-12 Thread Jay Kreps
Hey Jason/Neha,

Yeah, clearly having a mandatory, generic init() method that initializes
both transactions and topic metadata would be the ideal solution. This
would solve the occasional complaint about blocking behavior during
initialization of metadata (or at least shift it to a new complaint about
an inability to initialize when the cluster isn't up in test environments).
The challenge is that we can't do this because it isn't backwards
compatible with existing apps that don't call init.

The alternative of having an optional generic init() call is a bit odd
because to figure out if you need to call it you need to discover what it
does, which is not generic, it initializes transactions. We can't really
add more logic to init because it only gets invoked by transaction users so
it doesn't really function as a generic init.

What do you think of this solution:

   1. Add a generic init() call which initializes both transactions and
   metadata
   2. If you don't call init(), metadata is initialized on the first send
   (as now) and transactions are lazily initialized at the first
   beginTransaction() call.

-Jay







On Mon, Dec 12, 2016 at 9:17 PM, Jason Gustafson  wrote:

> @Neha
>
>
> 1. I think we should consider renaming initTransactions to just init() and
> > moving the metadata initialization there. Let's make sure we don't add
> APIs
> > that are relevant to this proposal only. Instead, try to think what we'd
> > propose if we were writing the producer from scratch today. I suspect we
> > would end up with an init() API that would do the metadata initialization
> > as well as the transaction stuff lazily. If so, let's make that change
> now.
>
>
> I think the only awkwardness with `init()` is that it would probably have
> to be an optional API for non-transactional usage to support existing code.
> I'm also not sure what metadata we can actually initialize at that point
> since we don't know which topics will be produced to. That said, I'm also
> not fond of the `initTransactions` name, and we may find other uses for a
> generic `init()` in the future, so I'm in favor this renaming.
>
>
> > 2. Along the same lines, let's think about the role of each id that the
> > producer will have and see if everything still makes sense. For instance,
> > we have quite a few per-producer-instance notions -- client.id, a
> producer
> > id and a transaction.app.id, some set via config and some generated
> > on-the-fly. What role does each play, how do they relate to each other
> and
> > is there an opportunity to get rid of any.
>
>
> The abundance of ids is super annoying. The producer ID is not actually
> exposed in either the producer or consumer, but I'm not sure how successful
> we'll be in hiding its existence from the user (you probably need to know
> about it for debugging and administrative purposes at least). This issue
> has been a continual thorn and I'm not sure I have a great answer. We have
> been tempted to use client.id as the AppID at one point or another, but
> its
> current usage is to have the same value for all producers in an
> application. The lack of an AppID meant that we would have to expose the
> producer ID and the application would be responsible for persisting it. In
> the use cases we looked at, it was simpler to let the application provide
> its own ID through configuration. And in use cases where there was no
> obvious ID to serve as the AppID, it seemed simple enough to let the
> application generate its own. We also looked at removing the producer ID.
> This was discussed somewhere above, but basically the idea is to store the
> AppID in the message set header directly and avoid the mapping to producer
> ID altogether. As long as batching isn't too bad, the impact on total size
> may not be too bad, but we were ultimately more comfortable with a fixed
> size ID.
>
> 3. I think we should definitely consider renaming transaction.app.id to
> > something else. Given that we already have a notion of application.id
> and
> > it represents the entire Streams application, having transaction.app.id
> > that represents a producer instance is confusing. I do understand that,
> for
> > Streams, the user doesn't have to set transaction.app.id as it will
> likely
> > be application.id+taskId (am I understanding that correctly?)
>
>
> Your understanding is correct. The "transaction" prefix was intended to
> make it clear that it was only needed for transactional usage. We've also
> referred to the AppID as a producer "instance ID." This is more suggestive
> of the fact that it needs to be unique within the producers of a particular
> application. Maybe we could drop the "transaction" and use "instance.id"
> or
> "app.instance.id"? Not sure that's any better, but perhaps it avoids the
> confusion with application.id?
>
> Thanks,
> Jason
>
> On Mon, Dec 12, 2016 at 8:37 PM, Jason Gustafson 
> wrote:
>
> > @Becket
> >
> > It has been a pain in many 

Re: [VOTE] KIP-87 - Add Compaction Tombstone Flag

2016-12-11 Thread Jay Kreps
 (or optional schemas in other formats), e.g. [null,
> string], in which case losing the schema truly is losing information
> (whereas null is already the only valid value for a pure null schema).
>
> -Ewen
>
>
> On Sat, Dec 10, 2016 at 9:24 PM, Michael Pearce <michael.pea...@ig.com>
> wrote:
>
> > Hi Jay,
> >
> > Good point this detail is missing in the KIP write up. Ive added this
> now.
> >
> > Essentially simply just upgrading the clients we do not expect any client
> > app code change needed.
> >
> > Cheers
> > Mike
> > 
> > From: Jay Kreps <j...@confluent.io>
> > Sent: Saturday, December 10, 2016 10:51 PM
> > To: dev@kafka.apache.org
> > Subject: Re: [VOTE] KIP-87 - Add Compaction Tombstone Flag
> >
> > Michael,
> >
> > The compatibility section goes through the migration path, but isn't the
> > bigger compatibility issue with existing apps? There are many (probably
> > thousands) of apps in production that use this feature and send null to
> > mean delete. It seems like this would break compatibility with them, and
> > they would have to be rewritten to right?
> >
> > -Jay
> >
> > On Thu, Dec 8, 2016 at 12:12 AM, Michael Pearce <michael.pea...@ig.com>
> > wrote:
> >
> > > Hi Jun,
> > >
> > > 4) On v3 we honour the tombstone. As such we expect it to be set
> > correctly
> > > as per the KIP.
> > >
> > > 4.1) why would we want to produce an error when its v3? This is the
> exact
> > > purpose to support non-null tombstone’s
> > > 4.2) again here im unclear on the question on the v3, produce request
> we
> > > expect the tombstone flag to be set correctly.
> > >
> > > 4.4) compaction only occurs on compacted topics, the bit makes no
> > > difference and not looked at on un-compacted (time/size based
> eviction).
> > >
> > >
> > > On 06/12/2016, 20:08, "Jun Rao" <j...@confluent.io> wrote:
> > >
> > > Hi, Michael,
> > >
> > > 4. Then, I think I misunderstood this point. Could you document the
> > > following points in the wiki?
> > > 4.1 If producer V3 sets tombstone, but provides a non-null value,
> > does
> > > the
> > > send() get an error or does the producer automatically set the
> value
> > to
> > > null?
> > > 4.2 If producer V3 doesn't set tombstone, but provides a null
> value,
> > > does
> > > the send() get an error or does the producer automatically sets the
> > > tombstone?
> > > 4.3 Does the broker only expect messages that (a) have no tombstone
> > and
> > > non-null value; (b) have tombstone and null value and reject the
> > > messages
> > > with an error code otherwise?
> > > 4.4 Do 4.1, 4.2,  4.3 depend on whether the topic is compacted on
> > not?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Tue, Dec 6, 2016 at 10:35 AM, Michael Pearce <
> > michael.pea...@ig.com
> > > >
> > > wrote:
> > >
> > > > Not at all.  This only acts on compacted topics just as what
> occurs
> > > today
> > > >
> > > > Sent using OWA for iPhone
> > > > 
> > > > From: Jun Rao <j...@confluent.io>
> > > > Sent: Tuesday, December 6, 2016 6:25:28 PM
> > > > To: dev@kafka.apache.org
> > > > Subject: Re: [VOTE] KIP-87 - Add Compaction Tombstone Flag
> > > >
> > > > Hi, Michael,
> > > >
> > > > 4. Hmm, does that mean the new client library can never send a
> null
> > > message
> > > > even to a regular topic? This seems like a change of the existing
> > > behavior.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Tue, Dec 6, 2016 at 9:51 AM, Michael Pearce <
> > > michael.pea...@ig.com>
> > > > wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > Re 4) That's because we expect the tombstone value to be set
> > > correctly if
> > > > > message bit is 2, as such if an older client sends in on old
> > > message the

Re: [VOTE] KIP-87 - Add Compaction Tombstone Flag

2016-12-10 Thread Jay Kreps
Michael,

The compatibility section goes through the migration path, but isn't the
bigger compatibility issue with existing apps? There are many (probably
thousands) of apps in production that use this feature and send null to
mean delete. It seems like this would break compatibility with them, and
they would have to be rewritten to right?

-Jay

On Thu, Dec 8, 2016 at 12:12 AM, Michael Pearce 
wrote:

> Hi Jun,
>
> 4) On v3 we honour the tombstone. As such we expect it to be set correctly
> as per the KIP.
>
> 4.1) why would we want to produce an error when its v3? This is the exact
> purpose to support non-null tombstone’s
> 4.2) again here im unclear on the question on the v3, produce request we
> expect the tombstone flag to be set correctly.
>
> 4.4) compaction only occurs on compacted topics, the bit makes no
> difference and not looked at on un-compacted (time/size based eviction).
>
>
> On 06/12/2016, 20:08, "Jun Rao"  wrote:
>
> Hi, Michael,
>
> 4. Then, I think I misunderstood this point. Could you document the
> following points in the wiki?
> 4.1 If producer V3 sets tombstone, but provides a non-null value, does
> the
> send() get an error or does the producer automatically set the value to
> null?
> 4.2 If producer V3 doesn't set tombstone, but provides a null value,
> does
> the send() get an error or does the producer automatically sets the
> tombstone?
> 4.3 Does the broker only expect messages that (a) have no tombstone and
> non-null value; (b) have tombstone and null value and reject the
> messages
> with an error code otherwise?
> 4.4 Do 4.1, 4.2,  4.3 depend on whether the topic is compacted on not?
>
> Thanks,
>
> Jun
>
> On Tue, Dec 6, 2016 at 10:35 AM, Michael Pearce  >
> wrote:
>
> > Not at all.  This only acts on compacted topics just as what occurs
> today
> >
> > Sent using OWA for iPhone
> > 
> > From: Jun Rao 
> > Sent: Tuesday, December 6, 2016 6:25:28 PM
> > To: dev@kafka.apache.org
> > Subject: Re: [VOTE] KIP-87 - Add Compaction Tombstone Flag
> >
> > Hi, Michael,
> >
> > 4. Hmm, does that mean the new client library can never send a null
> message
> > even to a regular topic? This seems like a change of the existing
> behavior.
> >
> > Thanks,
> >
> > Jun
> >
> > On Tue, Dec 6, 2016 at 9:51 AM, Michael Pearce <
> michael.pea...@ig.com>
> > wrote:
> >
> > > Hi Jun,
> > >
> > > Re 4) That's because we expect the tombstone value to be set
> correctly if
> > > message bit is 2, as such if an older client sends in on old
> message the
> > > message is upcast and the bit is set correctly. And such no longer
> need
> > to
> > > check the value. Mayuresh can you confirm my thinking and
> understanding
> > of
> > > what we've discussed?
> > >
> > > The second point I understand what you're getting at now my
> apologies.
> > Yes
> > > this makes sense to save on touching the message, if we're the
> only kip
> > > going in, in this release.
> > >
> > > Cheers
> > > Mike
> > >
> > > Sent using OWA for iPhone
> > > 
> > > From: Jun Rao 
> > > Sent: Tuesday, December 6, 2016 5:22:13 PM
> > > To: dev@kafka.apache.org
> > > Subject: Re: [VOTE] KIP-87 - Add Compaction Tombstone Flag
> > >
> > > Hi, Michael,
> > >
> > > 4. Is this updated in the wiki? The text "If the magic byte on
> message is
> > > 2, the broker should use the tombstone bit for log compaction."
> doesn't
> > > seem to have changed.
> > >
> > > 2. My point is that if we change the message format just for this
> KIP, we
> > > should consider whether it's worth optimizing the down conversion
> path
> > > (i.e., decide whether a conversion is needed by just looking at the
> > > tombstone bit in the wrapper message) since tombstone will be used
> > rarely.
> > > However, if the message format change here is combined with other
> KIPs,
> > > then this optimization likely won't be needed. The latter probably
> makes
> > > the code simpler. Jiangjie, Mayuresh, what do you think?
> > >
> > > Other than those, +1 from me,
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Tue, Dec 6, 2016 at 8:54 AM, Michael Pearce <
> michael.pea...@ig.com>
> > > wrote:
> > >
> > > > Hi Jun
> > > >
> > > > do we have your vote on this now?
> > > >
> > > > Any other concerns?
> > > >
> > > > Cheers
> > > > Mike
> > > >
> > > > Sent using OWA for iPhone
> > > > 
> > > > From: Michael Pearce 
> > > > Sent: 

Re: [VOTE]: KIP-97: The client compatibility KIP

2016-12-10 Thread Jay Kreps
+1 (binding)

-Jay

On Wed, Dec 7, 2016 at 9:17 AM, Colin McCabe  wrote:

> Hi all,
>
> I heard that the VOTE and DISCUSS threads for the KIP-97 discussion
> appeared to be in the same email thread for some people using gmail.  So
> I'm reposting in hopes of getting a separate email thread this time for
> those viewers. :)
>
> I'd like to start voting on KIP-97
> (https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 97%3A+Improved+Kafka+Client+RPC+Compatibility+Policy
> ).
>
> The discussion thread can be found here:
> https://www.mail-archive.com/dev@kafka.apache.org/msg60955.html
>
> Thanks for your feedback.
>
> best,
> Colin McCabe
>


Re: [DISCUSS] KIP-81: Max in-flight fetches

2016-12-10 Thread Jay Kreps
Jason, it's not just decompression but also the conversion from packed
bytes to java objects, right? That can be even larger than the
decompression blow up. I think this may be okay, the problem may just be
that the naming is a bit misleading. In the producer you are literally
allocating a buffer of that size, so the name buffer.memory makes sense. In
this case it is something more like max.bytes.read.per.poll.call (terrible
name, but maybe something like that?).

Mickael, I'd second Jason's request for the default and expand on it. We
currently have several consumer-related memory
settings--max.partition.fetch.bytes, fetch.max.bytes. I don't think it is
clear today how to set these. For example we mark max.partition.fetch.bytes
as high importance and fetch.max.bytes as medium, but it seems like it
would be the other way around. Can we think this through from the point of
view of a lazy user? I.e. I have 64MB of space to use for my consumer, in
an ideal world I'd say, "hey consumer here is 64MB go use that as
efficiently as possible" and not have to tune a bunch of individual things
with complex relationships. Maybe one or both of the existing settings can
either be eliminated or at the least marked as low priority and we can
infer a reasonable default from the new config your introducing?

-jay

On Fri, Dec 9, 2016 at 2:08 PM, Jason Gustafson  wrote:

> Hi Mickael,
>
> I think the approach looks good, just a few minor questions:
>
> 1. The KIP doesn't say what the default value of `buffer.memory` will be.
> Looks like we use 50MB as the default for `fetch.max.bytes`, so perhaps it
> makes sense to set the default based on that. Might also be worth
> mentioning somewhere the constraint between the two configs.
> 2. To clarify, this limit only affects the uncompressed size of the fetched
> data, right? The consumer may still exceed it in order to store the
> decompressed record data. We delay decompression until the records are
> returned to the user, but because of max.poll.records, we may end up
> holding onto the decompressed data from a single partition for a few
> iterations. I think this is fine, but probably worth noting in the KIP.
> 3. Is there any risk using the MemoryPool that, after we fill up the memory
> with fetch data, we can starve the coordinator's connection? Suppose, for
> example, that we send a bunch of pre-fetches right before returning to the
> user. These fetches might return before the next call to poll(), in which
> case we might not have enough memory to receive heartbeats, which would
> block us from sending additional heartbeats until the next call to poll().
> Not sure it's a big problem since heartbeats are tiny, but might be worth
> thinking about.
>
> Thanks,
> Jason
>
>
> On Fri, Dec 2, 2016 at 4:31 AM, Mickael Maison 
> wrote:
>
> > It's been a few days since the last comments. KIP-72 vote seems to
> > have passed so if I don't get any new comments I'll start the vote on
> > Monday.
> > Thanks
> >
> > On Mon, Nov 14, 2016 at 6:25 PM, radai 
> wrote:
> > > +1 - there's is a need for an effective way to control kafka memory
> > > consumption - both on the broker and on clients.
> > > i think we could even reuse the exact same param name -
> > *queued.max.bytes *-
> > > as it would serve the exact same purpose.
> > >
> > > also (and again its the same across the broker and clients) this bound
> > > should also cover decompression, at some point.
> > > the problem with that is that to the best of my knowledge the current
> > wire
> > > protocol does not declare the final, uncompressed size of anything up
> > front
> > > - all we know is the size of the compressed buffer. this may require a
> > > format change in the future to properly support?
> > >
> > > On Mon, Nov 14, 2016 at 10:03 AM, Mickael Maison <
> > mickael.mai...@gmail.com>
> > > wrote:
> > >
> > >> Thanks for all the replies.
> > >>
> > >> I've updated the KIP:
> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >> 81%3A+Bound+Fetch+memory+usage+in+the+consumer
> > >> The main point is to selectively read from sockets instead of
> > >> throttling FetchRequests sends. I also mentioned it will be reusing
> > >> the MemoryPool implementation created in KIP-72 instead of adding
> > >> another memory tracking method.
> > >>
> > >> Please have another look. As always, comments are welcome !
> > >>
> > >> On Thu, Nov 10, 2016 at 2:47 AM, radai 
> > wrote:
> > >> > selectively reading from sockets achieves memory control (up to and
> > not
> > >> > including talk of (de)compression)
> > >> >
> > >> > this is exactly what i (also, even mostly) did for kip-72 - which i
> > hope
> > >> in
> > >> > itself should be a reason to think about both KIPs at the same time
> > >> because
> > >> > the changes will be similar (at least in intent) and might result in
> > >> > duplicated effort.
> > >> >
> > >> > a pool 

Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-09 Thread Jay Kreps
Hey Michael,

Yeah, I don't think you need to go into the details of whatever you guys
have. I think several people in the thread said "let's do XA transactions
too!" Obviously in a world where features were free and always worked
perfectly we would! I've probably talked to about 100 people about their
use of XA transactions in different systems and my observation has been (a)
they are a bit of an operational nightmare, (b) the use cases i've
understood don't actually require full XA transactions they actually
require a much weaker and easier to guarantee property. The result is you
pay a big complexity cost for a guarantee much stronger than what you
wanted. My sense is that this opinion is broadly shared by the distributed
systems community at large and by Kafka folks in particular.

I'm a contrarian so I think it is great not to be too swayed by "common
wisdom" though. Five years ago there was a consensus that distributed
transactions were too hard to implement in an operationally sound way,
which i think was not correct, so the bad reputation for cross-system
transactions may be equally wrong!

To build a compelling case this is wrong I think two things need to be done:

   1. Build a case that there are a large/important set of use cases that
   cannot be solved with two independent transactions (as i described), and
   that these use cases are things Kafka should be able to do.
   2. Come up with the concrete extensions to the KIP-98 proposal that
   would enable an operationally sound implementation for pluggable
   multi-system XA.

-Jay



On Fri, Dec 9, 2016 at 10:25 AM, Michael Pearce <michael.pea...@ig.com>
wrote:

> Hi Jay,
>
> I can't go too deep into exact implantation due to no NDA. So apologies
> here.
>
> Essentially we have multiple processes each owning selection of accounts
> so on general flows an action for an account just needs to be managed local
> to the owning node, happy days ever change is handled as a tick tock change.
>
> Unfortunately when a transfer occurs we need the two processes to
> co-ordinate their transaction, we also need to ensure both don't continue
> other actions/changesl, we do this using a data grid technology. This grid
> technology supports transaction manager that we couple into currently our
> jms provider which supports full XA transactions as such we can manage the
> production of the change messages out the system transactionally as well as
> the in grid state.
>
> The obvious arguement here is should we even look to move this flow off
> JMS then. We prob shouldn't nor will do this.
>
> The point is that I think saying Kafka supports transactions but then not
> supporting it as per the traditional sense leads to developers expecting
> similar behaviour and will cause issues in prod when they find it doesn't
> work as they're used to.
>
> As my other response earlier, is there a better name to describe this
> feature, if we're not implementing transactions to the traditional
> transaction expected, to avoid this confusion?
>
>
> Sent using OWA for iPhone
> 
> From: Jay Kreps <j...@confluent.io>
> Sent: Friday, December 9, 2016 6:08:07 PM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional
> Messaging
>
> Hey Michael,
>
> Doesn't that example have more to do with applying the update against two
> rows in a single transaction? That is, clearly the write to Kafka needs to
> be "transactional" and the write to the destination needs to be
> transactional, but it's not clear to me that you need isolation that spans
> both operations. Can you dive into the system architecture a bit more and
> explain why Kafka needs to participate in the same transaction as the
> destination system?
>
> -Jay
>
> On Thu, Dec 8, 2016 at 10:19 PM, Michael Pearce <michael.pea...@ig.com>
> wrote:
>
> > Usecase in IG:
> >
> > Fund transfer between accounts. When we debit one account and fund
> another
> > we must ensure the records to both occur as an acid action, and as a
> single
> > transaction.
> >
> > Today we achieve this because we have jms, as such we can do the actions
> > needed in an xa transaction across both the accounts. To move this flow
> to
> > Kafka we would need support of XA transaction.
> >
> >
> >
> > Sent using OWA for iPhone
> > 
> > From: Michael Pearce <michael.pea...@ig.com>
> > Sent: Friday, December 9, 2016 6:09:06 AM
> > To: dev@kafka.apache.org
> > Subject: Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional
> > Messaging
> >
> > Hi Jay,
> >
> > For me having

Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-09 Thread Jay Kreps
With respect to the naming, I think it makes sense for two reasons.

   1. The begin/commit/rollback semantics are what people expect from
   transactions, i.e. it passes the intuitive "gut check" meaning people have.
   If you make up a new name it will likely not convey the expectation.
   2. There is a strong analogy between logs and tables and this feature
   enables ACID-like usage of the log in modifying a table.

I'll dive into the later. What does ACID mean for a log? Well it is
semi-well-defined for a table, what would the implications be for the
equivalent log representation?

Here is the analogy:

   - *Atomic* - This is straight-forward, a set of updates are either all
   published or all not published.
   - *Consistent* - This one is not very well defined in ACID, it isn't
   always as simple as linearizability. It alternately means either (a) a
   transaction started now sees all past committed transactions, (b) the
   database checks various DB-specific things like foreign-key constraints, or
   (c) some undefined notion of application correctness without any particular
   invariant. I think the most sane interpretation is based on (a) and means
   that the consumer sees transactions in commit order. We don't try to
   guarantee this, but for a log the reader controls the order of processing
   so this is possible. We could add future features to do this reordering for
   people as a convenience.
   - *Isolated* - In ACID this means a reader doesn't see the results of
   uncommitted transactions. In a log this means you get complete transactions
   all at once rather than getting half a transaction. This is primarily up to
   you in how you use the data you consume.
   - *Durable* - This falls out of Kafka's replication.

I'm less worried about confusion with other messaging systems. Kafka is
genuinely different in a number of areas and it is worth people
understanding that difference.

-Jay

On Fri, Dec 9, 2016 at 10:25 AM, Michael Pearce <michael.pea...@ig.com>
wrote:

> Hi Jay,
>
> I can't go too deep into exact implantation due to no NDA. So apologies
> here.
>
> Essentially we have multiple processes each owning selection of accounts
> so on general flows an action for an account just needs to be managed local
> to the owning node, happy days ever change is handled as a tick tock change.
>
> Unfortunately when a transfer occurs we need the two processes to
> co-ordinate their transaction, we also need to ensure both don't continue
> other actions/changesl, we do this using a data grid technology. This grid
> technology supports transaction manager that we couple into currently our
> jms provider which supports full XA transactions as such we can manage the
> production of the change messages out the system transactionally as well as
> the in grid state.
>
> The obvious arguement here is should we even look to move this flow off
> JMS then. We prob shouldn't nor will do this.
>
> The point is that I think saying Kafka supports transactions but then not
> supporting it as per the traditional sense leads to developers expecting
> similar behaviour and will cause issues in prod when they find it doesn't
> work as they're used to.
>
> As my other response earlier, is there a better name to describe this
> feature, if we're not implementing transactions to the traditional
> transaction expected, to avoid this confusion?
>
>
> Sent using OWA for iPhone
> 
> From: Jay Kreps <j...@confluent.io>
> Sent: Friday, December 9, 2016 6:08:07 PM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional
> Messaging
>
> Hey Michael,
>
> Doesn't that example have more to do with applying the update against two
> rows in a single transaction? That is, clearly the write to Kafka needs to
> be "transactional" and the write to the destination needs to be
> transactional, but it's not clear to me that you need isolation that spans
> both operations. Can you dive into the system architecture a bit more and
> explain why Kafka needs to participate in the same transaction as the
> destination system?
>
> -Jay
>
> On Thu, Dec 8, 2016 at 10:19 PM, Michael Pearce <michael.pea...@ig.com>
> wrote:
>
> > Usecase in IG:
> >
> > Fund transfer between accounts. When we debit one account and fund
> another
> > we must ensure the records to both occur as an acid action, and as a
> single
> > transaction.
> >
> > Today we achieve this because we have jms, as such we can do the actions
> > needed in an xa transaction across both the accounts. To move this flow
> to
> > Kafka we would need support of XA transaction.
> >
> >
> >
> > Sent using OWA for iPhone
>

Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-09 Thread Jay Kreps
Hey Michael,

Doesn't that example have more to do with applying the update against two
rows in a single transaction? That is, clearly the write to Kafka needs to
be "transactional" and the write to the destination needs to be
transactional, but it's not clear to me that you need isolation that spans
both operations. Can you dive into the system architecture a bit more and
explain why Kafka needs to participate in the same transaction as the
destination system?

-Jay

On Thu, Dec 8, 2016 at 10:19 PM, Michael Pearce <michael.pea...@ig.com>
wrote:

> Usecase in IG:
>
> Fund transfer between accounts. When we debit one account and fund another
> we must ensure the records to both occur as an acid action, and as a single
> transaction.
>
> Today we achieve this because we have jms, as such we can do the actions
> needed in an xa transaction across both the accounts. To move this flow to
> Kafka we would need support of XA transaction.
>
>
>
> Sent using OWA for iPhone
> 
> From: Michael Pearce <michael.pea...@ig.com>
> Sent: Friday, December 9, 2016 6:09:06 AM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional
> Messaging
>
> Hi Jay,
>
> For me having an XA transaction allows for ensuring ACID across my
> application.
>
> I believe it is part of the JMS api, and obviously JMS still is in
> enterprise very widely adopted for Messaging transport , so obviously to
> say it isn't widely used i think is ignoring a whole range of users. Like
> wise I believe frameworks like spring etc fully support it more evidence of
> its wide adoption.
>
> On this note personally we try to avoid transactions entirely in our flows
> for performance and simplicity. but we do alas unfortunately have one or
> two places we cannot ignore it.
>
> Cheers
> Mike
>
> Sent using OWA for iPhone
> 
> From: Jay Kreps <j...@confluent.io>
> Sent: Thursday, December 8, 2016 11:25:53 PM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional
> Messaging
>
> Hey Edoardo,
>
> For (3) can you outline what you think the benefit and use cases for a more
> general cross-system XA feature would be an what changes to the proposal
> would be required to enable it? When I have asked people who wanted
> cross-system XA in the past what they wanted it for, I haven't really
> gotten any answers that made sense. Every person really wanted something
> that would be better solved by a transactional (or idempotent) write to
> Kafka followed by an independent transactional (or idempotent) consumption
> (which this proposal enables). For the use cases they described tying these
> two things together had no advantage and many disadvantages.
>
> I have one use case which would be accomplished by cross-system XA which is
> allowing the producer to block on the synchronous processing of the message
> by (all? some?) consumers. However I'm not convinced that cross-system XA
> is the best solution to this problem, and I'm also not convinced this is an
> important problem to solve. But maybe you have something in mind here.
>
> -Jay
>
>
>
> On Thu, Dec 8, 2016 at 1:15 PM, Edoardo Comar <eco...@uk.ibm.com> wrote:
>
> > Hi,
> > thanks, very interesting KIP ... I haven't fully digested it yet.
> >
> > We have many users who choose not to use the Java client,  so I have
> > concerns about the added complexity in developing the clients.
> > A few questions.
> >
> > 1 - is mixing transactional and non transactional messages on the *same
> > topic-partition* really a requirement ?
> > What use case does it satisfy?
> >
> > 2 - I guess some clients may only be interested to implement the producer
> > idempotency.
> > It's not clear how they could be implemented without having to add the
> > transaction capabilities.
> > As others on this list have said, I too would like to see idempotency as
> a
> > more basic feature, on top which txns can be built.
> >
> > 3 - The KIP seems focused on a use case where consumption from a topic
> and
> > subsequent production are part of the producer transaction.
> >
> > It'd be great to see a way to extend the producer transaction to include
> > additional transactional resources,
> > so that the consumption from another topic just becomes a special case of
> > a more general "distributed" txn.
> >
> > Edo
> > --
> > Edoardo Comar
> > IBM MessageHub
> > eco...@uk.ibm.com
> > IBM UK 

Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-08 Thread Jay Kreps
Hey Edoardo,

For (3) can you outline what you think the benefit and use cases for a more
general cross-system XA feature would be an what changes to the proposal
would be required to enable it? When I have asked people who wanted
cross-system XA in the past what they wanted it for, I haven't really
gotten any answers that made sense. Every person really wanted something
that would be better solved by a transactional (or idempotent) write to
Kafka followed by an independent transactional (or idempotent) consumption
(which this proposal enables). For the use cases they described tying these
two things together had no advantage and many disadvantages.

I have one use case which would be accomplished by cross-system XA which is
allowing the producer to block on the synchronous processing of the message
by (all? some?) consumers. However I'm not convinced that cross-system XA
is the best solution to this problem, and I'm also not convinced this is an
important problem to solve. But maybe you have something in mind here.

-Jay



On Thu, Dec 8, 2016 at 1:15 PM, Edoardo Comar  wrote:

> Hi,
> thanks, very interesting KIP ... I haven't fully digested it yet.
>
> We have many users who choose not to use the Java client,  so I have
> concerns about the added complexity in developing the clients.
> A few questions.
>
> 1 - is mixing transactional and non transactional messages on the *same
> topic-partition* really a requirement ?
> What use case does it satisfy?
>
> 2 - I guess some clients may only be interested to implement the producer
> idempotency.
> It's not clear how they could be implemented without having to add the
> transaction capabilities.
> As others on this list have said, I too would like to see idempotency as a
> more basic feature, on top which txns can be built.
>
> 3 - The KIP seems focused on a use case where consumption from a topic and
> subsequent production are part of the producer transaction.
>
> It'd be great to see a way to extend the producer transaction to include
> additional transactional resources,
> so that the consumption from another topic just becomes a special case of
> a more general "distributed" txn.
>
> Edo
> --
> Edoardo Comar
> IBM MessageHub
> eco...@uk.ibm.com
> IBM UK Ltd, Hursley Park, SO21 2JN
>
> IBM United Kingdom Limited Registered in England and Wales with number
> 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6
> 3AU
>
>
>
> From:   Guozhang Wang 
> To: "dev@kafka.apache.org" 
> Date:   30/11/2016 22:20
> Subject:[DISCUSS] KIP-98: Exactly Once Delivery and Transactional
> Messaging
>
>
>
> Hi all,
>
> I have just created KIP-98 to enhance Kafka with exactly once delivery
> semantics:
>
> *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> >*
>
> This KIP adds a transactional messaging mechanism along with an idempotent
> producer implementation to make sure that 1) duplicated messages sent from
> the same identified producer can be detected on the broker side, and 2) a
> group of messages sent within a transaction will atomically be either
> reflected and fetchable to consumers or not as a whole.
>
> The above wiki page provides a high-level view of the proposed changes as
> well as summarized guarantees. Initial draft of the detailed
> implementation
> design is described in this Google doc:
>
> https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> 0wSw9ra8
>
>
> We would love to hear your comments and suggestions.
>
> Thanks,
>
> -- Guozhang
>
>
>
> Unless stated otherwise above:
> IBM United Kingdom Limited - Registered in England and Wales with number
> 741598.
> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU
>


Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-06 Thread Jay Kreps
t's a fair point. You may still be able to do some merging if
> > adjacent message sets have the same PID, but the potential savings might
> > not be worth the cost of implementation. My gut feeling is that merging
> > message sets from different producers may not be a great idea anyway
> (you'd
> > have to accept the fact that you always need "deep iteration" to find the
> > PIDs contained within the message set), but I haven't thought a ton about
> > it. Ultimately we'll have to decide if the potential for savings in the
> > future is worth some loss in efficiency now (for what it's worth, I think
> > the work that Ben has been looking at also hopes to bundle some more
> > information into the message set header).
> >
> > On a purely pragmatic development level, after spending a ton of recent
> > time working with that code, I can say that the benefit of having a
> > conceptually simpler message format is huge. It allows you to converge
> the
> > paths for validation of message sets on the broker, for example.
> Currently,
> > we pretty much need two separate paths everywhere we process messages. It
> > can be tricky just to tell if the message you're dealing with is the
> inner
> > or outer message, and whether it matters or not. Also, the fact that the
> > inner and outer messages share common fields makes bugs like KAFKA-4298
> > <https://issues.apache.org/jira/browse/KAFKA-4298> possible. The risk of
> > these bugs is much smaller when you can clearly separate the fields
> allowed
> > in the message set header and those in the messages.
> >
> >
> > Thanks,
> > Jason
> >
> > On Thu, Dec 1, 2016 at 8:19 PM, Jay Kreps <j...@confluent.io> wrote:
> >
> > > Looks great!
> > >
> > > A few questions:
> > >
> > >1. What is the relationship between transaction.app.id and the
> > existing
> > >config application.id in streams?
> > >2. The initTransactions() call is a little annoying. Can we get rid
> of
> > >that and call it automatically if you set a transaction.app.id when
> > we
> > >do the first message send as we do with metadata? Arguably we should
> > > have
> > >included a general connect() or init() call in the producer, but
> given
> > > that
> > >we didn't do this it seems weird that the cluster metadata
> initializes
> > >automatically on demand and the transaction metadata doesn't.
> > >3. The equivalent concept of what we call "fetch.mode" in databases
> is
> > >called "isolation level" and takes values like "serializable", "read
> > >committed", "read uncommitted". Since we went with transaction as
> the
> > > name
> > >for the thing in between the begin/commit might make sense to use
> this
> > >terminology for the concept and levels? I think the behavior we are
> > >planning is "read committed" and the alternative re-ordering
> behavior
> > is
> > >equivalent to "serializable"?
> > >4. Can the PID be made 4 bytes if we handle roll-over gracefully? 2
> > >billion concurrent producers should be enough for anyone, right?
> > >5. One implication of factoring out the message set seems to be you
> > >can't ever "repack" messages to improve compression beyond what is
> > done
> > > by
> > >the producer. We'd talked about doing this either by buffering when
> > > writing
> > >or during log cleaning. This isn't a show stopper but I think one
> > >implication is that we won't be able to do this. Furthermore with
> log
> > >cleaning you'd assume that over time ALL messages would collapse
> down
> > > to a
> > >single wrapper as compaction removes the others.
> > >
> > > -Jay
> > >
> > > On Wed, Nov 30, 2016 at 2:19 PM, Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I have just created KIP-98 to enhance Kafka with exactly once
> delivery
> > > > semantics:
> > > >
> > > > *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
> > > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 98+-+Exactly+Once+Delivery+and+Transactional+Messaging>*
> > > >
> > > > This KIP adds a transactional messaging mechanism along with an
> > > idempotent
> > > > producer implementation to make sure that 1) duplicated messages sent
> > > from
> > > > the same identified producer can be detected on the broker side, and
> > 2) a
> > > > group of messages sent within a transaction will atomically be either
> > > > reflected and fetchable to consumers or not as a whole.
> > > >
> > > > The above wiki page provides a high-level view of the proposed
> changes
> > as
> > > > well as summarized guarantees. Initial draft of the detailed
> > > implementation
> > > > design is described in this Google doc:
> > > >
> > > > https://docs.google.com/document/d/11Jqy_
> > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> > > > 0wSw9ra8
> > > >
> > > >
> > > > We would love to hear your comments and suggestions.
> > > >
> > > > Thanks,
> > > >
> > > > -- Guozhang
> > > >
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: [DISCUSS] KIP-95: Incremental Batch Processing for Kafka Streams

2016-12-05 Thread Jay Kreps
I'd like to second the discouragement of adding a new topic per job. We
went down this path in Samza and I think the result was quite a mess. You
had to read the full topic every time a job started and so it added a lot
of overhead and polluted the topic space.

What if we did the following:

   1. Use timestamp instead of offset
   2. Store the "stopping timestamp" in the metadata field associated with
   the existing offset storage mechanism
   3. Don't worry about fully processing the entire DAG. After all,
   partially processing a tuple isn't much different from not processing it,
   and in any case the stopping point is a heuristic so no point in being
   overly precise here.

Probably I'm missing something, though, I haven't thought through the
implications of using time instead of offset.

-Jay

On Mon, Nov 28, 2016 at 10:47 AM, Matthias J. Sax 
wrote:

> Hi all,
>
> I want to start a discussion about KIP-95:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams
>
> Looking forward to your feedback.
>
>
> -Matthias
>
>
>


Re: [DISCUSS] KIP-98: Exactly Once Delivery and Transactional Messaging

2016-12-01 Thread Jay Kreps
Looks great!

A few questions:

   1. What is the relationship between transaction.app.id and the existing
   config application.id in streams?
   2. The initTransactions() call is a little annoying. Can we get rid of
   that and call it automatically if you set a transaction.app.id when we
   do the first message send as we do with metadata? Arguably we should have
   included a general connect() or init() call in the producer, but given that
   we didn't do this it seems weird that the cluster metadata initializes
   automatically on demand and the transaction metadata doesn't.
   3. The equivalent concept of what we call "fetch.mode" in databases is
   called "isolation level" and takes values like "serializable", "read
   committed", "read uncommitted". Since we went with transaction as the name
   for the thing in between the begin/commit might make sense to use this
   terminology for the concept and levels? I think the behavior we are
   planning is "read committed" and the alternative re-ordering behavior is
   equivalent to "serializable"?
   4. Can the PID be made 4 bytes if we handle roll-over gracefully? 2
   billion concurrent producers should be enough for anyone, right?
   5. One implication of factoring out the message set seems to be you
   can't ever "repack" messages to improve compression beyond what is done by
   the producer. We'd talked about doing this either by buffering when writing
   or during log cleaning. This isn't a show stopper but I think one
   implication is that we won't be able to do this. Furthermore with log
   cleaning you'd assume that over time ALL messages would collapse down to a
   single wrapper as compaction removes the others.

-Jay

On Wed, Nov 30, 2016 at 2:19 PM, Guozhang Wang  wrote:

> Hi all,
>
> I have just created KIP-98 to enhance Kafka with exactly once delivery
> semantics:
>
> *https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 98+-+Exactly+Once+Delivery+and+Transactional+Messaging
>  98+-+Exactly+Once+Delivery+and+Transactional+Messaging>*
>
> This KIP adds a transactional messaging mechanism along with an idempotent
> producer implementation to make sure that 1) duplicated messages sent from
> the same identified producer can be detected on the broker side, and 2) a
> group of messages sent within a transaction will atomically be either
> reflected and fetchable to consumers or not as a whole.
>
> The above wiki page provides a high-level view of the proposed changes as
> well as summarized guarantees. Initial draft of the detailed implementation
> design is described in this Google doc:
>
> https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF
> 0wSw9ra8
>
>
> We would love to hear your comments and suggestions.
>
> Thanks,
>
> -- Guozhang
>


[jira] [Resolved] (KAFKA-1941) Timeout connections in the clients

2016-11-17 Thread Jay Kreps (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-1941?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jay Kreps resolved KAFKA-1941.
--
Resolution: Duplicate

Yes, I think this is resolved by KAFKA-1282.

> Timeout connections in the clients
> --
>
> Key: KAFKA-1941
> URL: https://issues.apache.org/jira/browse/KAFKA-1941
> Project: Kafka
>  Issue Type: Improvement
>    Reporter: Jay Kreps
>
> Currently the clients never close their connections due to inactivity. It 
> would be nice to have some idle time period after which the client would 
> close a connection.
> This should probably be implemented in Selector.java.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: [VOTE] KIP-72 - Allow putting a bound on memory consumed by Incoming requests

2016-11-11 Thread Jay Kreps
Hey Radai,

+1 on deprecating and eventually removing the old config. The intention was
absolutely bounding memory usage. I think having two ways of doing this,
one that gives a crisp bound on memory and one that is hard to reason about
is pretty confusing. I think people will really appreciate having one
config which instead lets them directly control the thing they actually
care about (memory).

I also want to second Jun's concern on the complexity of the self-GCing
memory pool. I wrote the memory pool for the producer. In that area the
pooling of messages is the single biggest factor in performance of the
client so I believed it was worth some sophistication/complexity if there
was performance payoff. All the same, the complexity of that code has made
it VERY hard to keep correct (it gets broken roughly every other time
someone makes a change). Over time I came to feel a lot less proud of my
cleverness. I learned something interesting reading your self-GCing memory
pool, but I wonder if the complexity is worth the payoff in this case?

Philosophically we've tried really hard to avoid needlessly "pluggable"
implementations. That is, when there is a temptation to give a config that
plugs in different Java classes at run time for implementation choices, we
should instead think of how to give the user the good behavior
automatically. I think the use case for configuring a the GCing pool would
be if you discovered a bug in which memory leaked. But this isn't something
the user should have to think about right? If there is a bug we should find
and fix it.

-Jay

On Fri, Nov 11, 2016 at 9:21 AM, radai  wrote:

> jun's #1 + rajini's #11 - the new config param is to enable changing the
> pool implentation class. as i said in my response to jun i will make the
> default pool impl be the simple one, and this param is to allow a user
> (more likely a dev) to change it.
> both the simple pool and the "gc pool" make basically just an
> AtomicLong.get() + (hashmap.put for gc) calls before returning a buffer.
> there is absolutely no dependency on GC times in allocating (or not). the
> extra background thread in the gc pool is forever asleep unless there are
> bugs (==leaks) so the extra cost is basically nothing (backed by
> benchmarks). let me re-itarate again - ANY BUFFER ALLOCATED MUST ALWAYS BE
> RELEASED - so the gc pool should not rely on gc for reclaiming buffers. its
> a bug detector, not a feature and is definitely not intended to hide bugs -
> the exact opposite - its meant to expose them sooner. i've cleaned up the
> docs to avoid this confusion. i also like the fail on leak. will do.
> as for the gap between pool size and heap size - thats a valid argument.
> may allow also sizing the pool as % of heap size? so queued.max.bytes =
> 100 for 1MB and queued.max.bytes = 0.25 for 25% of available heap?
>
> jun's 2.2 - queued.max.bytes + socket.request.max.bytes still holds,
> assuming the ssl-related buffers are small. the largest weakness in this
> claim has to do with decompression rather than anything ssl-related. so yes
> there is an O(#ssl connections * sslEngine packet size) component, but i
> think its small. again - decompression should be the concern.
>
> rajini's #13 - interesting optimization. the problem is there's no knowing
> in advance what the _next_ request to come out of a socket is, so this
> would mute just those sockets that are 1. mutable and 2. have a
> buffer-demanding request for which we could not allocate a buffer. downside
> is that as-is this would cause the busy-loop on poll() that the mutes were
> supposed to prevent - or code would need to be added to ad-hocmute a
> connection that was so-far unmuted but has now generated a memory-demanding
> request?
>
>
>
> On Fri, Nov 11, 2016 at 5:02 AM, Rajini Sivaram <
> rajinisiva...@googlemail.com> wrote:
>
> > Radai,
> >
> > 11. The KIP talks about a new server configuration parameter
> > *memory.pool.class.name
> >  *which is not in the implementation. Is
> it
> > still the case that the pool will be configurable?
> >
> > 12. Personally I would prefer not to have a garbage collected pool that
> > hides bugs as well. Apart from the added code complexity and extra thread
> > to handle collections, I am also concerned about the non-deterministic
> > nature of GC timings. The KIP introduces delays in processing requests
> > based on the configuration parameter *queued.max.bytes. *This in
> unrelated
> > to the JVM heap size and hence pool can be full when there is no pressure
> > on the JVM to garbage collect. The KIP does not prevent other timeouts in
> > the broker (eg. consumer session timeout) because it is relying on the
> pool
> > to be managed in a deterministic, timely manner. Since a garbage
> collected
> > pool cannot provide that guarantee, wouldn't it be better to run tests
> with
> > a GC-pool that perhaps fails with a fatal error if it encounters a buffer
> > that 

Re: [DISCUSS] KIP-81: Max in-flight fetches

2016-11-02 Thread Jay Kreps
Hey Radai,

I think there are a couple discussions here. The first is about what is the
interface to the user. The other is about what is exposed in the protocol,
and implementation details of reading requests. I strongly agree with
giving the user a simple "use X MB of memory" config and we calculate
everything else off of that is really ideal. 99.9% of the time that is all
you would care about. We often can't be perfect in this bound, but as long
as we're close it is fine. I don't think this necessarily implies using a
pool as in the producer. There may be an opportunity to reuse memory, which
may or may not help performance, but last i checked we cached a bunch of
deserialized records too which can't really be reused easily. All we really
need to do, I think, is bound the bytes read per user-level poll call and
stop early when the limit is reached, right?

I'm also a big fan of simplifying config. If you think there are other
areas we could rationalize, I think it'd be good to explore those too. I
think the issue we always struggle with is that there are areas where you
need fine grained control. Our current approach is to try to manage that
with the importance level marking of the configs.

-Jay



On Wed, Nov 2, 2016 at 10:36 AM, Gwen Shapira <g...@confluent.io> wrote:

> +1
>
> On Wed, Nov 2, 2016 at 10:34 AM, radai <radai.rosenbl...@gmail.com> wrote:
>
> > In my opinion a lot of kafka configuration options were added using the
> > "minimal diff" approach, which results in very nuanced and complicated
> > configs required to indirectly achieve some goal. case in point -
> timeouts.
> >
> > The goal here is to control the memory requirement. the 1st config was
> max
> > size of a single request, now the proposal is to control the number of
> > those in flight - which is inaccurate (you dont know the actual size and
> > must over-estimate), would have an impact on throughput in case of
> > over-estimation, and also fails to completely achieve the goal (what
> about
> > decompression?)
> >
> > I think a memory pool in combination with Jay's proposal to only pick up
> > from socket conditionally when memory is available is the correct
> approach
> > - it deals with the problem directly and would result in a simler and
> more
> > understandable configuration (a single property for max memory
> > consumption).
> >
> > in the future the accuracy of the limit can be improved by, for example,
> > declaring both the compressed _AND UNCOMPRESSED_ sizes up front, so that
> we
> > can pick up from socket when we have enough memory to decompress as well
> -
> > this would obviously be a wire format change and outside the scope here,
> > but my point is that it could be done without adding any new configs)
> >
> > On Mon, Oct 31, 2016 at 10:25 AM, Joel Koshy <jjkosh...@gmail.com>
> wrote:
> >
> > > Agreed with this approach.
> > > One detail to be wary of is that since we multiplex various other
> > requests
> > > (e.g., heartbeats, offset commits, metadata, etc.) over the client that
> > > connects to the coordinator this could delay some of these critical
> > > requests. Realistically I don't think it will be an issue except in
> > extreme
> > > scenarios where someone sets the memory limit to be unreasonably low.
> > >
> > > Thanks,
> > >
> > > Joel
> > >
> > > On Sun, Oct 30, 2016 at 12:32 PM, Jun Rao <j...@confluent.io> wrote:
> > >
> > > > Hi, Mickael,
> > > >
> > > > I agree with others that it's better to be able to control the bytes
> > the
> > > > consumer can read from sockets, instead of limiting the fetch
> requests.
> > > > KIP-72 has a proposal to bound the memory size at the socket selector
> > > > level. Perhaps that can be leveraged in this KIP too.
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > > 72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Thu, Oct 27, 2016 at 3:23 PM, Jay Kreps <j...@confluent.io> wrote:
> > > >
> > > > > This is a good observation on limiting total memory usage. If I
> > > > understand
> > > > > the proposal I think it is that the consumer client would stop
> > sending
> > > > > fetch requests once a certain number of in-flight fetch requests is
> > > met.
> > > > I
> > > > > think a better approach would be t

Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-10-31 Thread Jay Kreps
Xavier,

Yeah I think that post KIP-58 it is possible to depend on the delivery of
messages in compacted topics, if you override the default compaction time.
Prior to that it was true that you could control the delete retention, but
any message including the tombstone could be compacted away prior to
delivery. That was what I meant by non-deterministic. Now that we have that
KIP I agree that at least it can be given an SLA like any other message
(which was what I was meaning by deterministic).

-Jay

On Fri, Oct 28, 2016 at 10:15 AM, Xavier Léauté  wrote:

> >
> > I kind of agree with James that it is a bit questionable how valuable any
> > data in a delete marker can be since it will be deleted somewhat
> > nondeterministically.
> >
>
> One could argue that even in normal topics, assuming a time-based log
> retention policy is in place, any message will be deleted somewhat
> nondeterministally, so why treat the compacted ones any differently? To me
> at least, the retention setting for delete messages seems to be the
> counterpart to the time-based retention setting for normal topics.
>
> Currently the semantics of the messages are in the eye of the beholder--you
> > can choose to interpret a stream as either being appends or revisions as
> > you choose. This proposal is changing that so that the semantics are
> > determined by the sender.
>
>
> Let's imagine someone wanted to augment this stream to include audit logs
> for each record update, e.g. which user made the change. One would want to
> include that information as part of the message, and have the ability to
> mark a deletion.
>
> I don't think it changes the semantics in this case, you can still choose
> to interpret the data as a stream of audit log entries (inserts), ignoring
> the tombstone flag, or you can interpret it as a table modeling only the
> latest version of each record. Whether a compacted or normal topic is used
> shouldn't matter to the sender.
>


Re: [DISCUSS] KIP-81: Max in-flight fetches

2016-10-27 Thread Jay Kreps
This is a good observation on limiting total memory usage. If I understand
the proposal I think it is that the consumer client would stop sending
fetch requests once a certain number of in-flight fetch requests is met. I
think a better approach would be to always issue one fetch request to each
broker immediately, allow the server to process that request, and send data
back to the local machine where it would be stored in the socket buffer (up
to that buffer size). Instead of throttling the requests sent, the consumer
should ideally throttle the responses read from the socket buffer at any
given time. That is, in a single poll call, rather than reading from every
single socket it should just read until it has a given amount of memory
used then bail out early. It can come back and read more from the other
sockets after those messages are processed.

The advantage of this approach is that you don't incur the additional
latency.

-Jay

On Mon, Oct 10, 2016 at 6:41 AM, Mickael Maison 
wrote:

> Hi all,
>
> I would like to discuss the following KIP proposal:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 81%3A+Max+in-flight+fetches
>
>
> Feedback and comments are welcome.
> Thanks !
>
> Mickael
>


Re: [DISCUSS] KIP-87 - Add Compaction Tombstone Flag

2016-10-27 Thread Jay Kreps
I kind of agree with James that it is a bit questionable how valuable any
data in a delete marker can be since it will be deleted somewhat
nondeterministically.

Let's definitely ensure the change is worth the resulting pain and
additional complexity in the data model.

I think the two things we maybe conflated in the original compaction work
was the semantics of the message and its retention policy (I'm not sure,
but maybe).

In some sense a normal Kafka topic is a stream of pure appends (inserts). A
compacted topic is a series of revisions to the keyed entity--updates or
deletes.

Currently the semantics of the messages are in the eye of the beholder--you
can choose to interpret a stream as either being appends or revisions as
you choose. This proposal is changing that so that the semantics are
determined by the sender.

So in Kafka Streams you could have a KTable of "customer account updates"
which would model the latest version of each customer account record; you
could also have a KStream which would model the stream of updates being
made to customer account records. You can create either of these off the
same topic---the semantics come from your interpretation not the data
itself. Both of these interpretations actually make sense: if you want to
count the number of accounts in a given geographical region you want to
compute that off the KTable, if you want to count the number of account
modifications you want to compute that off the KStream.

This proposal changes this slightly. Now we are saying the semantics of the
message are set by the sender. I'm not sure if this is better or worse--it
seems a little at odds with what we are doing in streams. If we are going
to change it I wonder if there aren't actually three types of message
{INSERT, UPDATE, DELETE}. (And by update I really mean "upsert"). Compacted
topics only make sense if your topic contains only UPDATE and/or DELETE
messages. "Normal" topics are pure inserts.

If we are making this change how does it effect streams? What happens if I
send a DELETE message to a non-compacted topic where deletes are impossible?

-Jay



On Tue, Oct 25, 2016 at 9:09 AM, Michael Pearce 
wrote:

> Hi All,
>
> I would like to discuss the following KIP proposal:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 87+-+Add+Compaction+Tombstone+Flag
>
> This is off the back of the discussion on KIP-82  / KIP meeting where it
> was agreed to separate this issue and feature. See:
> http://mail-archives.apache.org/mod_mbox/kafka-dev/201610.
> mbox/%3cCAJS3ho8OcR==EcxsJ8OP99pD2hz=iiGecWsv-
> EZsBsNyDcKr=g...@mail.gmail.com%3e
>
> Thanks
> Mike
>
> The information contained in this email is strictly confidential and for
> the use of the addressee only, unless otherwise indicated. If you are not
> the intended recipient, please do not read, copy, use or disclose to others
> this message or any attachment. Please also notify the sender by replying
> to this email or by telephone (+44(020 7896 0011) and then delete the email
> and any copies of it. Opinions, conclusion (etc) that do not relate to the
> official business of this company shall be understood as neither given nor
> endorsed by it. IG is a trading name of IG Markets Limited (a company
> registered in England and Wales, company number 04008957) and IG Index
> Limited (a company registered in England and Wales, company number
> 01190902). Registered address at Cannon Bridge House, 25 Dowgate Hill,
> London EC4R 2YA. Both IG Markets Limited (register number 195355) and IG
> Index Limited (register number 114059) are authorised and regulated by the
> Financial Conduct Authority.
>


Re: [VOTE] Add REST Server to Apache Kafka

2016-10-25 Thread Jay Kreps
-1

I think the REST server for Kafka that already exists is quite good and
getting contributions. Moving this into the core project doesn't solve a
problem that I see.

-Jay

On Tue, Oct 25, 2016 at 2:16 PM, Harsha Chintalapani 
wrote:

> Hi All,
>We are proposing to have a REST Server as part of  Apache Kafka
> to provide producer/consumer/admin APIs. We Strongly believe having
> REST server functionality with Apache Kafka will help a lot of users.
> Here is the KIP that Mani Kumar wrote
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 80:+Kafka+Rest+Server.
> There is a discussion thread in dev list that had differing opinions on
> whether to include REST server in Apache Kafka or not. You can read more
> about that in this thread
> http://mail-archives.apache.org/mod_mbox/kafka-dev/201610.mbox/%3CCAMVt_
> aymqeudm39znsxgktpdde46sowmqhsxop-+jmbcuv7...@mail.gmail.com%3E
>
>   This is a VOTE thread to check interest in the community for
> adding REST Server implementation in Apache Kafka.
>
> Thanks,
> Harsha
>


Re: [DISCUSS] KIP-80: Kafka REST Server

2016-10-24 Thread Jay Kreps
gt; >> > > voted on is:
> >> > >
> >> > > Is Apache Kafka Community only about the Core or does the apache
> >> > community
> >> > > also support some subprojects (and just we need some better way to
> >> manage
> >> > > this)
> >> > >
> >> > > If vote for Core only wins, then the following should be removed:
> >> > > Kafka Connect
> >> > > Kafka Stream
> >> > >
> >> > > If vote for Core only loses (aka we will support subprojects) then:
> >> > > We should look to add Kafka Rest
> >> > >
> >> > > And we should look to see how we can manage better govern and manage
> >> > > submodules.
> >> > >
> >> > > A good example which id propose here is how some other communities
> >>in
> >> > > Apache do this.
> >> > >
> >> > > Each Module has a Module Management Committee(MMC), this is like
> >almost
> >> > > the PMC but at a per module basis.
> >> > >
> >> > > This MMC should essentially hold the binding votes for that module.
> >> > > The MMC should be made up of a single representative from each
> >> > > organisation  (so no single organisation can fully veto the
> >>community
> >> it
> >> > > has to a genuine consenus)
> >> > > The MMC requires at least 3 members (so there cant be a tied vote on
> >2)
> >> > > For a new Module to be added a MMC committee should be sought
> >> > > A new Module is only capable of being added if the above
> >>requirements
> >> can
> >> > > be met (e.g. 3 people wishing to step up, from 3 organisations) so
> >that
> >> > > only actively support modules would be added
> >> > >
> >> > > The PMC reviews each module every 6months or Year. If MMC is
> >>inactive,
> >> a
> >> > > vote/call to find replacements if raised, if none are forthcoming
> >> > dropping
> >> > > the MMC to less than 3 then the module moves to "the attic" (very
> >>much
> >> > like
> >> > > apache attic but a little more aggressively)
> >> > >
> >> > > This way the PMC does not need to micro manage every module
> >> > > We only add modules where some amount of active support and
> >maintenance
> >> > > and use is provided by the community
> >> > > We have an automatic way to retire old or inactive projects.
> >> > >
> >> > > Thoughts?
> >> > > Mike
> >> > >
> >> > >
> >> > > 
> >> > > From: Harsha Ch <harsha...@gmail.com>
> >> > > Sent: Thursday, October 20, 2016 10:26 PM
> >> > > To: dev@kafka.apache.org
> >> > > Subject: Re: [DISCUSS] KIP-80: Kafka REST Server
> >> > >
> >> > > Jay,
> >> > >   REST API is something every user is in need of. If the
> >>argument
> >> is
> >> > to
> >> > > clone and write your  API, this will do a disservice to the users as
> >> they
> >> > > now have to choose one vs. others instead of keeping one API that is
> >> > > supported in Kafka community.
> >> > >
> >> > > "Pre-emptively re-creating another
> >> > > REST layer when it seems like we all quite agree on what needs to be
> >> done
> >> > > and we have an existing code base for HTTP/Kafka access that is
> >heavily
> >> > > used in production seems quite silly."
> >> > >
> >> > >Exactly our point. Why can't we develop this in Apache Kafka
> >> > > community? Instead of us open sourcing another GitHub project and
> >> > creating
> >> > > a divide in users and another version of API. Let's build this in
> >Kafka
> >> > > Community and use the governance model that is proven to provide
> >vendor
> >> > > free user driven consensus features. The argument that is adding
> >>this
> >> > REST
> >> > > server to Kafka will affect the agility of the project doesn't mak
> >> sense.
> >> > >
> >> > > It looks like your argument is either we develop all these small
> >>tools

Re: [DISCUSS] KIP-80: Kafka REST Server

2016-10-21 Thread Jay Kreps
Harsha,

You seem to be saying that your only two options are to fork or duplicate
the existing REST project or add it to Kafka to be able to contribute. I
don't think those are the only two options. The other option is to
contribute to the existing successful project--which is Apache licensed and
getting active contribution today. You always have the power to
fork/duplicate later if you don't like the governance/community/direction.
Saying that you have to do this proactively doesn't really make sense.

-Jay

On Thu, Oct 20, 2016 at 2:27 PM, Harsha Chintalapani <ka...@harsha.io>
wrote:

> Jay,
>   REST API is something every user is in need of. If the argument is to
> clone and write your  API, this will do a disservice to the users as they
> now have to choose one vs. others instead of keeping one API that is
> supported in Kafka community.
>
> "Pre-emptively re-creating another
> REST layer when it seems like we all quite agree on what needs to be done
> and we have an existing code base for HTTP/Kafka access that is heavily
> used in production seems quite silly."
>Exactly our point. Why can't we develop this in Apache Kafka
> community? Instead of us open sourcing another GitHub project and creating
> a divide in users and another version of API. Let's build this in Kafka
> Community and use the governance model that is proven to provide vendor
> free user driven consensus features. The argument that is adding this REST
> server to Kafka will affect the agility of the project doesn't mak sense.
>
> It looks like your argument is either we develop all these small tools or
> none at all. We as a community need to look at supporting critical
> tools/API. Instead of dividing this project into individual external
> communities. We should build this as part of Kafka which best serves the
> needs of users.
> The Streams and Connect projects that were pushed into Kafka could
> have been left in their own Github projects based on your arguments. What
> about the REST API is so different that such that it should stay out of the
> Kafka project? From my experience, more users are asking for the REST API.
>
> Thanks,
> Harsha
>
> On Sun, Oct 16, 2016 at 5:19 PM Jungtaek Lim <kabh...@gmail.com> wrote:
>
> > I guess no one doubts its power on REST server or even UI. I understand
> the
> > difficulty to add a module to project, but it's maximized when there is
> > less support expected hence maintenance issue is likely to rise, and IMHO
> > this seems to be not the case.
> >
> > There're also pain points when project doesn't maintain features and
> > delegates to ecosystem. Based on some points (last commit date, pull
> > request open and closed, and contributor graph), kafka-manager seems to
> > have similar activity to kafka-rest, but it doesn't show any responses
> for
> > pull request supporting Kafka 0.10.0 even though numerous users leave
> > comments wish to support. What Kafka community can do for that project to
> > follow up? Nothing but just persuading by leaving comments hoping that
> will
> > be merged. (or finally come up another implementation) Kafka project
> keeps
> > agile but in point of whole ecosystem it can be less agile.
> >
> > Yes decisions and roadmap of the project are driven by PMCs and I think
> > it's valid right. But we also imagine ASF projects as driven by community
> > aspect, though it's alike to ideal world. KIP makes innovation on
> adopting
> > new feature transparently, which makes many developers inspiring and
> > adopting it to their projects. Hopes that Kafka community continuously
> > drives the transparency model among the ASF projects, and beyond.
> >
> > - Jungtaek Lim (HeartSaVioR)
> >
> > 2016년 10월 17일 (월) 오전 7:56, Jay Kreps <j...@confluent.io>님이 작성:
> >
> > Hey Nacho,
> >
> > Yeah, I think it is definitely a call we have to make case by case. We
> have
> > some experience with this: originally we attempted to maintain things
> like
> > non-java clients, a hadoop connector, etc all in the main project. The
> > difficulty of that lead us to the current federated approach. In terms of
> > what is included now, yes, I agree you could potentially have even less
> > included.
> >
> > -Jay
> >
> > On Wed, Oct 12, 2016 at 11:37 AM, Nacho Solis
> <nso...@linkedin.com.invalid
> > >
> > wrote:
> >
> > > What is the criteria for keeping things in and out of Kafka, what code
> > goes
> > > in or out and what is part of the architecture or not?
> > >
> > > The discussion of what goes into a project and what stays out is

Re: [DISCUSS] KIP-80: Kafka REST Server

2016-10-16 Thread Jay Kreps
Hey Nacho,

Yeah, I think it is definitely a call we have to make case by case. We have
some experience with this: originally we attempted to maintain things like
non-java clients, a hadoop connector, etc all in the main project. The
difficulty of that lead us to the current federated approach. In terms of
what is included now, yes, I agree you could potentially have even less
included.

-Jay

On Wed, Oct 12, 2016 at 11:37 AM, Nacho Solis <nso...@linkedin.com.invalid>
wrote:

> What is the criteria for keeping things in and out of Kafka, what code goes
> in or out and what is part of the architecture or not?
>
> The discussion of what goes into a project and what stays out is an always
> evolving question. Different projects treat this in different ways.
>
> Let me paint 2 extremes.  On one side, you have a single monolithic project
> that brings everything in one tent.  On the other side you have the many
> modules approach.  From what I've learned, Kafka falls in the middle.
> Because of this, the question is bound to come up with respect to the
> criteria used to bring something into the fold.
>
> I'll be the first to point out that the distinction between modules,
> architecture, software, repositories, governance and community are blurry.
> Not to mention that many things are how they are for historical reasons.
>
> I, personally, can't understand why we would not have REST as part of the
> main Kafka project given that a lot of people use it and we include many
> things with the current distribution.  What many things you may ask?  Well,
> if we took the modular approach Kafka is a mixture of components, here's
> the first 4 that come to mind:
> 1. The Kafka protocol
> 2. The Kafka java libraries
> 3. The Kafka broker
> 4. The Kafka stream framework
> 5. Kafka Connect
> 6. MirrorMaker
>
> All of these could be separate products. You should be able to evolve each
> one independently.  Even if they have dependencies on each other, you could
> potentially replace one part.
>
> The choice of keeping them all in a single repository, with a single
> distribution, under the same governance and community, brings a number of
> trade offs.  It's easy to keep things coherent for example.  There is less
> of a need to rely on inherent versioning and compatibility (which we end up
> providing anyway because of the way people usually deploy kafka). We all
> focus our efforts on a single code base.
>
> The downside is that it's harder to remove modules that are old or unused.
> Modules that are only used by a small subset of the community will have an
> impact on the rest of the community.  It mixes incentives of what people
> want to work on and what holds them back.  We also need to decide what
> belongs in the blessed bundle and what doesnt.
>
> So, my question boils down to, what criteria is used for bringing stuff in.
>
> If we have Streams and MirrorMaker and Connect in there, why not have REST?
> Specially if there is more than one person/group willing to work on it?
> Alternatively, if REST is not included because it's not used by all, then
> why not remove Streams, Connect and MirrorMaker since they're definitely
> not used by all? I realize I say this even though at LinkedIn we have a
> REST setup of our own, just speaking from a community perspective.
>
> Nacho
>
>
> (I'm relatively new and I haven't read all of the mail archive, so I'm sure
> this has been brought up before, but I decided to chime in anyway)
>
> On Wed, Oct 12, 2016 at 8:03 AM, Jay Kreps <j...@confluent.io> wrote:
>
> > I think the questions around governance make sense, I think we should
> > really clarify that to make the process more clear so it can be fully
> > inclusive.
> >
> > The idea that we should not collaborate on what is there now, though,
> > because in the future we might disagree about direction does not really
> > make sense to me. If in the future we disagree, that is the beauty of
> open
> > source, you can always fork off a copy of the code and start an
> independent
> > project either in Apache or elsewhere. Pre-emptively re-creating another
> > REST layer when it seems like we all quite agree on what needs to be done
> > and we have an existing code base for HTTP/kafka access that is heavily
> > used in production seems quite silly.
> >
> > Let me give some background on how I at least think about these things.
> > I've participated in open source projects out of LinkedIn via github as
> > well as via the ASF. I don't think there is a "right" answer to how to do
> > these but rather some tradeoffs. We thought about this quite a lot in the
> > context of Kafka based on the experi

Re: [DISCUSS] KIP-80: Kafka REST Server

2016-10-12 Thread Jay Kreps
> For 1, I don't think there is value in giving in to the NIH syndrome
> > >>and
> > >> > reinventing the wheel. What I'm looking for is a detailed comparison
> > >>of
> > >> the
> > >> > gaps and why those can't be improved in the REST proxy that already
> > >> exists
> > >> > and is actively maintained. For example, we depend on zkClient and
> > >>have
> > >> > found as well as fixed several bugs by working closely with the
> people
> > >> who
> > >> > maintain zkClient. This should be possible for REST proxy as well,
> > >>right?
> > >> >
> > >> > For 2, I'd like us to review our history of expanding the surface
> > >>area to
> > >> > add more clients in the past. Here is a summary -
> > >> >
> > >> >- This doesn't materially have an impact on expanding the
> > >>usability of
> > >> >Kafka. In my experience, REST proxy + Java clients only cover
> ~50%
> > >>of
> > >> > all
> > >> >Kafka users, and maybe 10% of those are the ones who will use the
> > >>REST
> > >> >proxy. The remaining 50% are non-java client users (C, python,
> go,
> > >> node
> > >> >etc).
> > >> >- People are a lot more excited about promising to contribute
> while
> > >> >adding the surface area but not on an ongoing basis down the
> line.
> > >> >- More surface area means more work to keep things consistent.
> > >>Failure
> > >> >to do that has, in fact, hurt the user experience.
> > >> >- More surface area hurts agility. We want to do a few things
> > >>really
> > >> >well as well as be agile to be able to build on our core
> > >>competency.
> > >> >
> > >> >
> > >> > On Sat, Oct 1, 2016 at 5:38 AM Manikumar <manikumar.re...@gmail.com
> >
> > >> > wrote:
> > >> >
> > >> > > Hi Jay,
> > >> > >
> > >> > > Thanks for your reply.
> > >> > >
> > >> > > I agree that we can not add all the clients/tools available in
> > >> ecosystem
> > >> > > page to Kafka repo itself. But we feel REST Interface is different
> > >>from
> > >> > > other clients/tools. Since any language that can work with HTTP
> can
> > >> > > easily integrate with this interface, Having an "official"  REST
> > >> > > interface helps user community. This also helps us to integrate
> well
> > >> > > with external management and provisioning tools.  Apache Kafka
> > >>release
> > >> > > with Java clients + REST interface is sufficient for most of the
> > >>user
> > >> > > deployments/requirements. This helps users to deal with less
> number
> > >> > > of distributions/builds.
> > >> > >
> > >> > > Thanks,
> > >> > > Manikumar
> > >> > >
> > >> > >
> > >> > > On Sat, Oct 1, 2016 at 4:24 AM, Jay Kreps <j...@confluent.io>
> wrote:
> > >> > >
> > >> > > > Hey guys,
> > >> > > >
> > >> > > > There's already a REST interface maintained as a separate
> > >> project--it's
> > >> > > > open source and apache licensed and actively maintained (
> > >> > > > https://github.com/confluentinc/kafka-rest). What is wrong with
> >
> >
> >
> > GitHub - confluentinc/kafka-rest: REST Proxy for Kafka
> > github.com
> > The Kafka REST Proxy provides a RESTful interface to a Kafka cluster. It
> > makes it easy to produce and consume messages, view the state of the
> > cluster, and ...
> >
> > >> that?
> > >> > > You
> > >> > > > mentioned that there was some compatibility concern, but
> > >> compatibility
> > >> > > has
> > >> > > > to do with the consumer protocol guarantees not the repo the
> code
> > >>is
> > >> > in,
> > >> > > > right? Not sure that concern makes sense.
> > >> > > >
> > >> > > > We could argue for adding pretty much anything and everything in
> > >>the
> > >> > > > ecosystem page in Kafka itself but I'm not sure that would make
> > >>the
> > >> > > project
> > >> > > > more agile.
> > >> > > >
> > >> > > > -Jay
> > >> > > >
> > >> > > > On Wed, Sep 28, 2016 at 12:04 AM, Manikumar <
> > >> manikumar.re...@gmail.com
> > >> > >
> > >> > > > wrote:
> > >> > > >
> > >> > > > > Hi Kafka Devs,
> > >> > > > >
> > >> > > > > I created KIP-80 to add Kafka REST Server to Kafka Repository.
> > >> > > > >
> > >> > > > > There are already open-source alternatives are available.  But
> > >>we
> > >> > would
> > >> > > > > like to add REST server that
> > >> > > > > many users ask for under Apache Kafka repo. Many data Infra
> > >>tools
> > >> > comes
> > >> > > > up
> > >> > > > > with Rest Interface.
> > >> > > > > It is useful to have inbuilt Rest API support for Produce,
> > >>Consume
> > >> > > > messages
> > >> > > > > and admin interface for
> > >> > > > > integrating with external management and provisioning
> tools.This
> > >> will
> > >> > > > also
> > >> > > > > allow the maintenance of
> > >> > > > > REST server and adding new features makes it easy because
> apache
> > >> > > > community.
> > >> > > > >
> > >> > > > > The KIP wiki is the following:
> > >> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >> > > > > 80%3A+Kafka+Rest+Server
> > >> > > > >
> > >> > > > > Your comments and feedback are welcome.
> > >> > > > >
> > >> > > > > Thanks,
> > >> > > > > Manikumar
> > >> > > > >
> > >> > > >
> > >> > >
> > >> > --
> > >> > Thanks,
> > >> > Neha
> > >> >
> > >>
> > The information contained in this email is strictly confidential and for
> > the use of the addressee only, unless otherwise indicated. If you are not
> > the intended recipient, please do not read, copy, use or disclose to
> others
> > this message or any attachment. Please also notify the sender by replying
> > to this email or by telephone (+44(020 7896 0011) and then delete the
> email
> > and any copies of it. Opinions, conclusion (etc) that do not relate to
> the
> > official business of this company shall be understood as neither given
> nor
> > endorsed by it. IG is a trading name of IG Markets Limited (a company
> > registered in England and Wales, company number 04008957) and IG Index
> > Limited (a company registered in England and Wales, company number
> > 01190902). Registered address at Cannon Bridge House, 25 Dowgate Hill,
> > London EC4R 2YA. Both IG Markets Limited (register number 195355) and IG
> > Index Limited (register number 114059) are authorised and regulated by
> the
> > Financial Conduct Authority.
> >
>


Re: [DISCUSS] KIP-82 - Add Record Headers

2016-10-07 Thread Jay Kreps
Hey guys,

This discussion has come up a number of times and we've always passed.

One of things that has helped keep Kafka simple is not adding in new
abstractions and concepts except when the proposal is really elegant and
makes things simpler.

Consider three use cases for headers:

   1. Kafka-scope: We want to add a feature to Kafka that needs a
   particular field.
   2. Company-scope: You want to add a header to be shared by everyone in
   your company.
   3. World-wide scope: You are building a third party tool and want to add
   some kind of header.

For the case of (1) you should not use headers, you should just add a field
to the record format. Having a second way of encoding things doesn't make
sense. Occasionally people have complained that adding to the record format
is hard and it would be nice to just shove lots of things in quickly. I
think a better solution would be to make it easy to add to the record
format, and I think we've made progress on that. I also think we should be
insanely focused on the simplicity of the abstraction and not adding in new
thingies often---we thought about time for years before adding a timestamp
and I guarantee you we would have goofed it up if we'd gone with the
earlier proposals. These things end up being long term commitments so it's
really worth being thoughtful.

For case (2) just use the body of the message. You don't need a globally
agreed on definition of headers, just standardize on a header you want to
include in the value in your company. Since this is just used by code in
your company having a more standard header format doesn't really help you.
In fact by using something like Avro you can define exactly the types you
want, the required header fields, etc.

The only case that headers help is (3). This is a bit of a niche case and i
think is easily solved just making the reading and writing of given
required fields pluggable to work with the header you have.

A couple of specific problems with this proposal:

   1. A global registry of numeric keys is super super ugly. This seems
   silly compared to the Avro (or whatever) header solution which gives more
   compact encoding, rich types, etc.
   2. Using byte arrays for header values means they aren't really
   interoperable for case (3). E.g. I can't make a UI that displays headers,
   or allow you to set them in config. To work with third party headers, the
   only case I think this really helps, you need the union of all
   serialization schemes people have used for any tool.
   3. For case (2) and (3) your key numbers are going to collide like
   crazy. I don't think a global registry of magic numbers maintained either
   by word of mouth or checking in changes to kafka source is the right thing
   to do.
   4. We are introducing a new serialization primitive which makes fields
   disappear conditional on the contents of other fields. This breaks the
   whole serialization/schema system we have today.
   5. We're adding a hashmap to each record
   6. This proposes making the ProducerRecord and ConsumerRecord mutable
   and adding setters and getters (which we try to avoid).

For context on LinkedIn: I set up the system there, but it may have changed
since i left. The header is maintained with the record schemas in the avro
schema registry and is required for all records. Essentially all messages
must have a field named "header" of type EventHeader which is itself a
record schema with a handful of fields (time, host, etc). The header
follows the same compatibility rules as other avro fields, so it can be
evolved in a compatible way gradually across apps. Avro is typed and
doesn't require deserializing the full record to read the header. The
header information is (timestamp, host, etc) is important and needs to
propagate into other systems like Hadoop which don't have a concept of
headers for records, so I doubt it could move out of the value in any case.
Not allowing teams to chose a data format other than avro was considered a
feature, not a bug, since the whole point was to be able to share data,
which doesn't work if every team chooses their own format.

I agree with the critique of compaction not having a value. I think we
should consider fixing that directly.

-Jay

On Thu, Sep 22, 2016 at 12:31 PM, Michael Pearce 
wrote:

> Hi All,
>
>
> I would like to discuss the following KIP proposal:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 82+-+Add+Record+Headers
>
>
>
> I have some initial ?drafts of roughly the changes that would be needed.
> This is no where finalized and look forward to the discussion especially as
> some bits I'm personally in two minds about.
>
> https://github.com/michaelandrepearce/kafka/tree/kafka-headers-properties
>
>
>
> Here is a link to a alternative option mentioned in the kip but one i
> would personally would discard (disadvantages mentioned in kip)
>
> https://github.com/michaelandrepearce/kafka/tree/kafka-headers-full?
>
>

Re: [DISCUSS] KIP-80: Kafka REST Server

2016-10-06 Thread Jay Kreps
Hi Manikumar,

I agree totally agree that REST is important. What I don't understand is
why we'd duplicate the existing REST interface inside the Kafka project.
That seems to needlessly fragment things.

-Jay

On Sat, Oct 1, 2016 at 5:38 AM, Manikumar <manikumar.re...@gmail.com> wrote:

> Hi Jay,
>
> Thanks for your reply.
>
> I agree that we can not add all the clients/tools available in ecosystem
> page to Kafka repo itself. But we feel REST Interface is different from
> other clients/tools. Since any language that can work with HTTP can
> easily integrate with this interface, Having an "official"  REST
> interface helps user community. This also helps us to integrate well
> with external management and provisioning tools.  Apache Kafka release
> with Java clients + REST interface is sufficient for most of the user
> deployments/requirements. This helps users to deal with less number
> of distributions/builds.
>
> Thanks,
> Manikumar
>
>
> On Sat, Oct 1, 2016 at 4:24 AM, Jay Kreps <j...@confluent.io> wrote:
>
> > Hey guys,
> >
> > There's already a REST interface maintained as a separate project--it's
> > open source and apache licensed and actively maintained (
> > https://github.com/confluentinc/kafka-rest). What is wrong with that?
> You
> > mentioned that there was some compatibility concern, but compatibility
> has
> > to do with the consumer protocol guarantees not the repo the code is in,
> > right? Not sure that concern makes sense.
> >
> > We could argue for adding pretty much anything and everything in the
> > ecosystem page in Kafka itself but I'm not sure that would make the
> project
> > more agile.
> >
> > -Jay
> >
> > On Wed, Sep 28, 2016 at 12:04 AM, Manikumar <manikumar.re...@gmail.com>
> > wrote:
> >
> > > Hi Kafka Devs,
> > >
> > > I created KIP-80 to add Kafka REST Server to Kafka Repository.
> > >
> > > There are already open-source alternatives are available.  But we would
> > > like to add REST server that
> > > many users ask for under Apache Kafka repo. Many data Infra tools comes
> > up
> > > with Rest Interface.
> > > It is useful to have inbuilt Rest API support for Produce, Consume
> > messages
> > > and admin interface for
> > > integrating with external management and provisioning tools.This will
> > also
> > > allow the maintenance of
> > > REST server and adding new features makes it easy because apache
> > community.
> > >
> > > The KIP wiki is the following:
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 80%3A+Kafka+Rest+Server
> > >
> > > Your comments and feedback are welcome.
> > >
> > > Thanks,
> > > Manikumar
> > >
> >
>


Including streams and connect better in site docs

2016-09-28 Thread Jay Kreps
Hey guys,

Gwen and I took a stab at better integrating Connect and Streams in the
Kafka site...they were largely absent in the api section, intro, home page,
etc. Take a look and see what you think. Major changes are the following:

   - Changed tag line from "a distributed messaging system" to "a
   distributed streaming platform" since kafka wasn't ever much of a messaging
   system and is definitely a lot more than that now.
   - Rewrote the introduction page
   - Stopped linking from the nav into sections in the doc

I think there is a lot more that could be done there--the design section
still dates largely from the 0.6 days.

The site is still super ugly, especially the graphic on the home page. As a
separate thing we'll start a discussion around restyling to improve the
look of things.

-Jay


Re: [DISCUSS] KIP-79 - ListOffsetRequest v1 and offsetForTime() method in new consumer.

2016-09-02 Thread Jay Kreps
This looks great, big improvements for the list offset protocol which is
currently quite odd.

One minor thing. I think the old v0 list offsets request also gave you the
highwater mark, it kind of shoves it in as the last thing in the array of
offsets. This is used internally to implement seekToEnd() iirc. How would
that work once v0 is removed?

Related, the wiki says:
"Another related feature missing in KafkaConsumer is the access of
partitions' high watermark. Typically, users only need the high watermark
in order to get the per partition lag. This seems more suitable to be
exposed through the metrics."

The obvious usage is computing lag for sure, and I agree that is really
more a metric than anything else, but I think that is not the only usage.
Here is a use case I think is quite important that requires knowing the
highwater mark:

Say you want to implement some kind of batch process that wakes up every 5
minutes or every hour or once a day and processes all the messages and then
goes back to sleep. The naive way to do that would be to poll() until you
don't get any more records, but this is broken in two minor ways, first
maybe you didn't get records because you are rebalancing and second this
might never happen if new records are always getting written. A better
approach is for your process, when it begins, to look at the current end of
the log and process only up to that offset.

This is important for Kafka Streams or anything else that wants to have a
kind of batch-like mode.

Technically you can do this by seeking to the end, checking your position,
then starting over, as people do today. But I think we can agree that is
kind of silly.

An alternative would be to rename TimestampOffset to something like
PartitionOffsets and have it have both the timestamp and offset as well as
the beginning offset and highwatermark for the partition. The underlying
protocol would need these two.

Cheers,

-Jay

On Tue, Aug 30, 2016 at 8:38 PM, Becket Qin  wrote:

> Hi Kafka devs,
>
> I created KIP-79 to allow consumer to precisely query the offsets based on
> timestamp.
>
> In short we propose to :
> 1. add a ListOffsetRequest/ListOffsetResponse v1, and
> 2. add an offsetForTime() method in new consumer.
>
> The KIP wiki is the following:
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65868090
>
> Comments are welcome.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>


[jira] [Comment Edited] (KAFKA-4113) Allow KTable bootstrap

2016-09-01 Thread Jay Kreps (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15456296#comment-15456296
 ] 

Jay Kreps edited comment on KAFKA-4113 at 9/1/16 6:52 PM:
--

Don't you get this naturally out of the message timestamps and the 
prioritization we already do? People say they want to  "fully populate" a table 
but i think this isn't true. Rather you want the table to be in the same state 
the associated streams would be in. To see the difference imagine a case where 
you have a job that is doing a stream-table join and say you lose all your 
materialized table state and have a job that is down for three hours (for 
whatever reason--maintenance or something). When it comes back up you don't 
actually want to catch all the way up on the table because if you do that you 
will be joining table data from now to stream data from three hours ago. 
Rather, what you want is to catch up the table to three hours ago and then keep 
the two roughly aligned so you are joining stream data from time X to the state 
of the table at time X.

But isn't this exactly what the time stamp prioritization does already? It 
naturally leads to you catching up on populating the table first if that data 
is older, right?


was (Author: jkreps):
Don't you get this naturally out of the message timestamps and the 
prioritization we already do? People say they want to  "fully populate" a table 
but i think this isn't true. Rather you want the table to be in the same state 
the associated streams would be in. To see the difference imagine a case where 
you have a job that is doing a stream-table join and say you lose all your 
materialized table state and have a job that is down for three hours (for 
whatever reason--maintenance or something). When it comes back up you don't 
actually want to catch all the way up on the table because if you do that you 
will be joining table data from now to stream data from three hours ago. 
Rather, what you want is to catch up the table to three hours ago and then keep 
the two roughly aligned so you are joining stream data from time X to the state 
of the table at time X.

But isn't this exactly what the time stamp prioritization does already?

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


  1   2   3   4   5   6   7   8   9   10   >