Re: [DISCUSS] KIP-253: Support in-order message delivery with partition expansion

2018-03-08 Thread Jan Filipiak

Hi Dong,

are you actually reading my emails, or are you just using the thread I 
started for general announcements regarding the KIP?


I tried to argue really hard against linear hashing. Growing the topic 
by an integer factor does not require any state redistribution at all. I 
fail to see completely where linear hashing helps on log compacted topics.


If you are not willing to explain to me what I might be overlooking: 
that is fine.
But I ask you to not reply to my emails then. Please understand my 
frustration with this.


Best Jan


On 06.03.2018 19:38, Dong Lin wrote:

Hi everyone,

Thanks for all the comments! It appears that everyone prefers linear
hashing because it reduces the amount of state that needs to be moved
between consumers (for stream processing). The KIP has been updated to use
linear hashing.

Regarding the migration endeavor: it seems that migrating producer library
to use linear hashing should be pretty straightforward without
much operational endeavor. If we don't upgrade client library to use this
KIP, we can not support in-order delivery after partition is changed
anyway. Suppose we upgrade client library to use this KIP, if partition
number is not changed, the key -> partition mapping will be exactly the
same as it is now because it is still determined using murmur_hash(key) %
original_partition_num. In other words, this change is backward compatible.

Regarding the load distribution: if we use linear hashing, the load may be
unevenly distributed because those partitions which are not split may
receive twice as much traffic as other partitions that are split. This
issue can be mitigated by creating topic with partitions that are several
times the number of consumers. And there will be no imbalance if the
partition number is always doubled. So this imbalance seems acceptable.

Regarding storing the partition strategy as per-topic config: It seems not
necessary since we can still use murmur_hash as the default hash function
and additionally apply the linear hashing algorithm if the partition number
has increased. Not sure if there is any use-case for producer to use a
different hash function. Jason, can you check if there is some use-case
that I missed for using the per-topic partition strategy?

Regarding how to reduce latency (due to state store/load) in stream
processing consumer when partition number changes: I need to read the Kafka
Stream code to understand how Kafka Stream currently migrate state between
consumers when the application is added/removed for a given job. I will
reply after I finish reading the documentation and code.


Thanks,
Dong


On Mon, Mar 5, 2018 at 10:43 AM, Jason Gustafson  wrote:


Great discussion. I think I'm wondering whether we can continue to leave
Kafka agnostic to the partitioning strategy. The challenge is communicating
the partitioning logic from producers to consumers so that the dependencies
between each epoch can be determined. For the sake of discussion, imagine
you did something like the following:

1. The name (and perhaps version) of a partitioning strategy is stored in
topic configuration when a topic is created.
2. The producer looks up the partitioning strategy before writing to a
topic and includes it in the produce request (for fencing). If it doesn't
have an implementation for the configured strategy, it fails.
3. The consumer also looks up the partitioning strategy and uses it to
determine dependencies when reading a new epoch. It could either fail or
make the most conservative dependency assumptions if it doesn't know how to
implement the partitioning strategy. For the consumer, the new interface
might look something like this:

// Return the partition dependencies following an epoch bump
Map dependencies(int numPartitionsBeforeEpochBump,
int numPartitionsAfterEpochBump)

The unordered case then is just a particular implementation which never has
any epoch dependencies. To implement this, we would need some way for the
consumer to find out how many partitions there were in each epoch, but
maybe that's not too unreasonable.

Thanks,
Jason


On Mon, Mar 5, 2018 at 4:51 AM, Jan Filipiak 
wrote:


Hi Dong

thank you very much for your questions.

regarding the time spend copying data across:
It is correct that copying data from a topic with one partition mapping

to

a topic with a different partition mapping takes way longer than we can
stop producers. Tens of minutes is a very optimistic estimate here. Many
people can not afford copy full steam and therefore will have some rate
limiting in place, this can bump the timespan into the day's. The good

part

is that the vast majority of the data can be copied while the producers

are

still going. One can then, piggyback the consumers ontop of this

timeframe,

by the method mentioned (provide them an mapping from their old offsets

to

new offsets in their repartitioned topics. In that way we separate
migration of 

Jenkins build is back to normal : kafka-trunk-jdk7 #3232

2018-03-08 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-255: OAuth Authentication via SASL/OAUTHBEARER

2018-03-08 Thread Rajini Sivaram
Hi Ron,

Thanks for the KIP. Sorry for the delay in reviewing this. I have a few
questions/comments.


   1. Are all of the classes listed in the KIP intended to be public
   classes/interfaces? Since it requires more effort to maintain public
   classes, it will be good if we can make more of the.code internal. For
   example, some of the classes in `oauthbearer.refresh` and `
   oauthbearer.smo` could be made internal?
   2. Agree that it is better to use unchecked exceptions. Kafka uses
   unchecked exceptions elsewhere for the same reasons you described.
   3. ExpiringCredentialxxx/RefreshConfigxxx: When OAuth is added, we will
   have two mechanisms using token refresh (Kerberos and OAuth). The KIP
   doesn't propose to unify the refresh logic for the two (that is fine).
   Since these interfaces are going to be used just for OAuth, I would keep
   them internal for now. If we do add another mechanism in future that can
   reuse this code, we could make them public, taking into account any
   changes required. It doesn't look like we would want to customise these for
   different OAuthBearer implementations.
   4. ExpiringCredentialLoginModule/ExpiringCredentialRefreshingLogin: I
   understand why the interfaces have additional methods to associate login
   with mechanism. But in 1.1, we introduced sasl.jaas.config property for
   brokers, which associates login module to mechanism. The broker property
   name is prefixed with listener name and mechanism (e.g.
   listener.name.sasl_ssl.oauthbearer.sasl.jaas.config). We will continue
   to support multiple login modules within a login context in the JAAS config
   file for backward compatibility, but I don't think we need to add
   additional interfaces to support this for new mechanisms since we can just
   use the property instead. With the new property that uses different
   contexts for different mechanisms, LoginManager is different for each
   context (i.e. each mechanism). So it should be possible to make this
   code simpler. The property also makes it easier to dynamically update
   configs corresponding to a mechanism.
   5. SubstitutableModuleOptions: These look very generic and usable for
   other mechanisms. I think they ought to be in a separate package not under
   oauthbearer. It will be good to enable this for PLAIN and SCRAM as well.
   This could even be in a separate KIP. Perhaps the package name could be a
   word since smo is not a standard abbreviation?
   6. OAuthSaslServer: Another option is to keep this internal without an
   additional interface and return OAuthBearerToken from
*SaslServer.getNegotiatedProperty(String
   propName)* with a publicly defined property name.
   7. OAuthBearerTokenValidator: I think this should be defined as a
   server-side Callback to be consistent with other mechanisms. Different
   OAuthBearer implementations could just use different callback handlers,
   which will be configurable anyway.
   8. OAuthBearerTokenRetriever: This could perhaps be a login callback if
   we made the login callback handler configurable.

Regards,

Rajini


On Thu, Feb 22, 2018 at 4:16 AM, Ron Dagostino  wrote:

> Hi everyone.  I implemented the ability to perform substitution in JAAS
> config module options, which was the only part of KIP 255 that was not
> implemented when I originally published the KIP last week.  I have made
> adjustments to that section of the KIP based on this implementation
> experience, including detailing how to add custom substitutions beyond the
> 4 built-in ones (file, system property, environment variable, and option
> substitution).  See
> https://github.com/rondagostino/kafka/commit/548a95822b06b60
> a92745084a3980d72295d2ce6
> (code coverage 82%) for the detailed changes.  The KIP code blocks are also
> up-to-date.
>
> Ron
>
>
> On Wed, Feb 14, 2018 at 8:11 PM, Ron Dagostino  wrote:
>
> > Thanks, Ted.  I've added the JIRA and mailing list links to the KIP, and
> I
> > added Javadoc addressing your questions -- both in the KIP code blocks
> and
> > on GitHub (https://github.com/rondagostino/kafka/commit/
> > c61f5bafad810b620ff1ebd04e1231d245183e36).
> >
> > Ron
> >
> > On Wed, Feb 14, 2018 at 7:19 PM, Ted Yu  wrote:
> >
> >> Nicely written KIP.
> >>
> >> Can you add link to this thread and fill in JIRA number ?
> >>
> >> For ExpiringCredential, why does expireTimeMillis() return long while
> >> other
> >> methods return Long ?
> >> Can you add some comment for WindowJitter in RefreshConfig ?
> >>
> >> Thanks
> >>
> >> On Wed, Feb 14, 2018 at 3:38 PM, Ron Dagostino 
> wrote:
> >>
> >> > Hi everyone.
> >> >
> >> > I created KIP-255: OAuth Authentication via SASL/OAUTHBEARER
> >> >  >> pageId=75968876
> >> > >
> >> >  (https://cwiki.apache.org/confluence/pages/viewpage.
> >> > action?pageId=75968876
> >> > ).
> >> >
> >> > This KIP proposes adding the 

Jenkins build is back to normal : kafka-trunk-jdk8 #2457

2018-03-08 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-250 Add Support for Quorum-based Producer Acknowledgment

2018-03-08 Thread Jason Gustafson
Initially I thought this proposal was just about improving producer
latency. So acks=quorum (or whatever) was sort of the midway point between
acks=1 and acks=all, but offsets would only be exposed according to current
high watermark semantics (meaning full ISR replication). However, it looks
like we are also trying to allow messages to be exposed to consumers as
soon as we have achieved replication to a quorum. In this case, I don't
think the ISR leader election makes sense any longer unless we're willing
to accept consumers seeing uncommitted data (which seems like a bad
regression to me). Probably the only way you can avoid it is to also
require a quorum for leader election. But maybe that's not so bad? If a
quorum is not present, producers wouldn't be able to make progress anyway,
so the only advantage of allowing election from a minority of replicas is
potentially the ability to serve old reads. That should be possible from
any replica anyway as long as we know that the data has been committed. So
effectively we would be giving users a 2f + 1 option. And I agree with
Colin that if we had that, future work could investigate moving the leader
election out of the controller.

-Jason

On Tue, Mar 6, 2018 at 6:45 PM, Jun Rao  wrote:

> Hi, Colin,
>
> A couple of thoughts on your previous comments.
>
> 1. I am not sure if the controller is necessarily a bottleneck for leader
> election. In the common case, leader election is due to a single broker
> failure. So, the controller only needs to change the leaders for the number
> of partitions on that broker, not for the whole cluster. Also, the
> controller can do the leader election for all affected partitions in a
> batch fashion and communicate the new leader for all partitions in a single
> RPC request.
>
> 2. Keeping the metadata such as ISR in ZK in the quorum mode has some
> benefits. Suppose that a user wants to wait for a quorum of 2 out of 4
> replicas. At some point, replicas A and B are in sync, and replicas C and D
> are out of sync. Later on, replicas A and B fail. By checking ISR, we can
> choose not to elect a new leader from replicas C and D to avoid unclean
> leader election. If we don't maintain ISR, this would be hard.
>
> Hi, Litao,
>
> At the high level, what you outlined makes sense. A few more comments.
>
> 10. If would be useful to clarify the semantics of the new configs. With a
> quorum size of Q, I guess we want to wait until Q in-sync replicas have
> received the message before exposing it to a consumer and acknowledging the
> producer if acks=quorum is specified. If the producer uses acks=0 or 1, it
> seems that the current semantic applies. However, if the producer uses
> acks=all while the topic is configured with quorum, or if the producer
> users acks=quorum while the topic is configured without quorum, should we
> just reject the request?
>
> 11. Just having a boolean enable.quorum.acks in the topic config is not
> enough, right? It seems that we need an integer value for the quorum size.
> It
> would useful for see what set of configurations makes more intuitive sense.
>
> For example, one option is to add sth like "quorum.required.acks" of type
> integer. Then, should we allow both quorum.required.acks and min.isr set on
> the same topic?
>
> Another option is to have a boolean config such as "enable.quorum.acks",
> which defaults to false. If it's enabled, we just reuse min.isr as the
> quorum size.
>
> 12. About the fencing issue that Guozhang mentioned. This is a potential
> issue in the quorum mode. There are a few ways that we can improve this.
> One approach is to force some kind of regular heartbeat between the broker
> and the controller. If a broker hasn't received a heartbeat from a
> controller for some time, the broker will pause any future writes on its
> leaders until the heartbeat is resumed. Another approach is that when there
> is log divergence, we could save the data to be truncated in the follower
> to a separate file and provide an external tool for users to retrieve these
> data, if they are needed. These improvements can potentially be done in
> future KIPs.
>
> Thanks,
>
> Jun
>
> On Mon, Feb 12, 2018 at 3:02 PM, Litao Deng  >
> wrote:
>
> > Folks. Thanks for all of the good discussions.
> >
> > Here are a few of my thoughts:
> >
> >1. we won't change any existing semantics. That means, besides acks
> '-1
> >(all)', '0', '1', we will introduce a separate 'quorum' and document
> the
> >semantic. 'quorum' is a totally different view of our replication
> > protocol
> >for the sake of better tail (P999) latency. I will advertise don't
> > compare
> >'quorum' with '-1 (all)' and any other existing values.
> >2. in terms of the controller functionality, I admit there are many
> >awesome consensus protocols; however, for this specific KIP, I choose
> to
> >minimize the impact/change on the controller code path.
> >   - 

[jira] [Created] (KAFKA-6627) Console producer default config values override explicitly provided properties

2018-03-08 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-6627:
--

 Summary: Console producer default config values override 
explicitly provided properties
 Key: KAFKA-6627
 URL: https://issues.apache.org/jira/browse/KAFKA-6627
 Project: Kafka
  Issue Type: Bug
  Components: tools
Reporter: Jason Gustafson


Some producer properties can be provided through custom parameters (e.g. 
{{\-\-request-required-acks}}) and explicitly through 
{{\-\-producer-property}}. At the moment, some of the custom parameters have 
default values which actually override explicitly provided properties. For 
example, if you set {{\-\-producer-property acks=all}} when starting the 
console producer, the argument will be ignored since 
{{\-\-request-required-acks}} has a default value. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] 1.1.0 RC1

2018-03-08 Thread Jeff Chao
Hello,

We at Heroku have run 1.1.0 RC1 through our normal performance and
regression test suite and have found performance to be comparable to 1.0.0.

That said, we're however -1 (non-binding) since this release includes
Zookeeper 3.4.11  which
is affected by the critical regression ZOOKEEPER-2960
. As 3.4.12 isn't
released yet, it might be better to have 3.4.10 included instead.

Jeff
Heroku


On Tue, Mar 6, 2018 at 1:19 PM, Ted Yu  wrote:

> +1
>
> Checked signature
> Ran test suite - apart from flaky testMetricsLeak, other tests passed.
>
> On Tue, Mar 6, 2018 at 2:45 AM, Damian Guy  wrote:
>
> > Hello Kafka users, developers and client-developers,
> >
> > This is the second candidate for release of Apache Kafka 1.1.0.
> >
> > This is minor version release of Apache Kakfa. It Includes 29 new KIPs.
> > Please see the release plan for more details:
> >
> > https://cwiki.apache.org/confluence/pages/viewpage.
> action?pageId=71764913
> >
> > A few highlights:
> >
> > * Significant Controller improvements (much faster and session expiration
> > edge cases fixed)
> > * Data balancing across log directories (JBOD)
> > * More efficient replication when the number of partitions is large
> > * Dynamic Broker Configs
> > * Delegation tokens (KIP-48)
> > * Kafka Streams API improvements (KIP-205 / 210 / 220 / 224 / 239)
> >
> > Release notes for the 1.1.0 release:
> > http://home.apache.org/~damianguy/kafka-1.1.0-rc1/RELEASE_NOTES.html
> >
> > *** Please download, test and vote by Friday, March 9th, 5pm PT
> >
> > Kafka's KEYS file containing PGP keys we use to sign the release:
> > http://kafka.apache.org/KEYS
> >
> > * Release artifacts to be voted upon (source and binary):
> > http://home.apache.org/~damianguy/kafka-1.1.0-rc1/
> >
> > * Maven artifacts to be voted upon:
> > https://repository.apache.org/content/groups/staging/
> >
> > * Javadoc:
> > http://home.apache.org/~damianguy/kafka-1.1.0-rc1/javadoc/
> >
> > * Tag to be voted upon (off 1.1 branch) is the 1.1.0 tag:
> > https://github.com/apache/kafka/tree/1.1.0-rc1
> >
> >
> > * Documentation:
> > http://kafka.apache.org/11/documentation.html
> >
> > * Protocol:
> > http://kafka.apache.org/11/protocol.html
> >
> > * Successful Jenkins builds for the 1.1 branch:
> > Unit/integration tests: https://builds.apache.org/job/kafka-1.1-jdk7/68
> > System tests: https://jenkins.confluent.io/
> job/system-test-kafka/job/1.1/
> > 30/
> >
> > /**
> >
> > Thanks,
> > Damian Guy
> >
>


Re: [VOTE] KIP-265: Make Windowed Serde to public APIs

2018-03-08 Thread Damian Guy
Thanks Guozhang +1

On Tue, 6 Mar 2018 at 00:26 Hu Xi  wrote:

> +1 (non-binding)
>
> 
> 发件人: Matthias J. Sax 
> 发送时间: 2018年3月6日 8:19
> 收件人: dev@kafka.apache.org
> 主题: Re: [VOTE] KIP-265: Make Windowed Serde to public APIs
>
> +1 (binding)
>
> Thanks for the KIP Guozhang!
>
> -Matthias
>
> On 3/5/18 2:41 PM, Bill Bejeck wrote:
> > Thanks for the KIP Guozhang.
> >
> > +1
> >
> > -Bill
> >
> > On Mon, Mar 5, 2018 at 5:11 PM, Ted Yu  wrote:
> >
> >> +1
> >>
> >> On Mon, Mar 5, 2018 at 1:46 PM, Guozhang Wang 
> wrote:
> >>
> >>> Hello all,
> >>>
> >>> I'd like to start voting on KIP-265, on making windowed serde to public
> >>> APIs of Kafka Streams. It involves a couple of new configs, plus a few
> >> new
> >>> public classes for windowed serializer and deserializer, and also
> adding
> >>> the corresponding console consumer options in order to fetch from a
> topic
> >>> written by a windowed store.
> >>>
> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>> 265%3A+Make+Windowed+Serde+to+public+APIs
> >>>
> >>>
> >>>
> >>> Thanks,
> >>> -- Guozhang
> >>>
> >>
> >
>
>


Re: [DISCUSS] KIP-253: Support in-order message delivery with partition expansion

2018-03-08 Thread Dong Lin
Hey Jan,

Sorry for the frustration. I haven't finished replying to all comments. For
example in my last email it is mentioned that "I will reply after I finish
reading the documentation and code". It takes time to think through
comments thoroughly. I have been busy with my daily work and haven't had
time to reply to all comments.

I have some quick rely to your comments. It is true that some space will be
wasted with the current approach in the KIP for log compacted topics. But
it does not actually affect correctness. If the goal is to save space for
log compacted topics after partition change, there can be a couple other
approaches as Jun mentioned. And these can be done separately outside this
KIP. Even if we use the data copying approach to address problem for log
compacted topics, it may still worth using the linear hashing to address
problem for non-log compacted topics to avoid copying large amount of data.
What do you think? Can you also take a look at Jun's comments?

Thanks,
Dong


On Tue, Mar 6, 2018 at 10:33 PM, Jan Filipiak 
wrote:

> Hi Dong,
>
> are you actually reading my emails, or are you just using the thread I
> started for general announcements regarding the KIP?
>
> I tried to argue really hard against linear hashing. Growing the topic by
> an integer factor does not require any state redistribution at all. I fail
> to see completely where linear hashing helps on log compacted topics.
>
> If you are not willing to explain to me what I might be overlooking: that
> is fine.
> But I ask you to not reply to my emails then. Please understand my
> frustration with this.
>
> Best Jan
>
>
>
> On 06.03.2018 19:38, Dong Lin wrote:
>
>> Hi everyone,
>>
>> Thanks for all the comments! It appears that everyone prefers linear
>> hashing because it reduces the amount of state that needs to be moved
>> between consumers (for stream processing). The KIP has been updated to use
>> linear hashing.
>>
>> Regarding the migration endeavor: it seems that migrating producer library
>> to use linear hashing should be pretty straightforward without
>> much operational endeavor. If we don't upgrade client library to use this
>> KIP, we can not support in-order delivery after partition is changed
>> anyway. Suppose we upgrade client library to use this KIP, if partition
>> number is not changed, the key -> partition mapping will be exactly the
>> same as it is now because it is still determined using murmur_hash(key) %
>> original_partition_num. In other words, this change is backward
>> compatible.
>>
>> Regarding the load distribution: if we use linear hashing, the load may be
>> unevenly distributed because those partitions which are not split may
>> receive twice as much traffic as other partitions that are split. This
>> issue can be mitigated by creating topic with partitions that are several
>> times the number of consumers. And there will be no imbalance if the
>> partition number is always doubled. So this imbalance seems acceptable.
>>
>> Regarding storing the partition strategy as per-topic config: It seems not
>> necessary since we can still use murmur_hash as the default hash function
>> and additionally apply the linear hashing algorithm if the partition
>> number
>> has increased. Not sure if there is any use-case for producer to use a
>> different hash function. Jason, can you check if there is some use-case
>> that I missed for using the per-topic partition strategy?
>>
>> Regarding how to reduce latency (due to state store/load) in stream
>> processing consumer when partition number changes: I need to read the
>> Kafka
>> Stream code to understand how Kafka Stream currently migrate state between
>> consumers when the application is added/removed for a given job. I will
>> reply after I finish reading the documentation and code.
>>
>>
>> Thanks,
>> Dong
>>
>>
>> On Mon, Mar 5, 2018 at 10:43 AM, Jason Gustafson 
>> wrote:
>>
>> Great discussion. I think I'm wondering whether we can continue to leave
>>> Kafka agnostic to the partitioning strategy. The challenge is
>>> communicating
>>> the partitioning logic from producers to consumers so that the
>>> dependencies
>>> between each epoch can be determined. For the sake of discussion, imagine
>>> you did something like the following:
>>>
>>> 1. The name (and perhaps version) of a partitioning strategy is stored in
>>> topic configuration when a topic is created.
>>> 2. The producer looks up the partitioning strategy before writing to a
>>> topic and includes it in the produce request (for fencing). If it doesn't
>>> have an implementation for the configured strategy, it fails.
>>> 3. The consumer also looks up the partitioning strategy and uses it to
>>> determine dependencies when reading a new epoch. It could either fail or
>>> make the most conservative dependency assumptions if it doesn't know how
>>> to
>>> implement the partitioning strategy. For the consumer, the new interface
>>> might look 

Re: [DISCUSS] KIP-267: Add Processor Unit Test Support to Kafka Streams Test Utils

2018-03-08 Thread Guozhang Wang
Hello John,

Thanks for the KIP. I made a pass over the wiki page and here are some
comments:

1. Meta-comment: there is an internal class MockProcessorContext under the
o.a.k.test package, which should be replaced as part of this KIP.

2. In @Override StreamsMetrics metrics(), will you return a fully created
StreamsMetricsImpl object or are you planning to use the
MockStreamsMetrics? Note that for the latter case you probably need to look
into https://issues.apache.org/jira/browse/KAFKA-5676 as well.

3. Not related to the KIP changes themselves: about
"context.scheduledPunctuators": we need to well document that in the
MockProcessorContext the scheduled punctuator will never by auto-triggered,
and hence it is only for testing people's code that some punctuators are
indeed registered, and if people want full auto punctuation testing they
have to go with TopologyTestDriver.



Guozhang


On Wed, Mar 7, 2018 at 8:04 PM, John Roesler  wrote:

> On Wed, Mar 7, 2018 at 8:03 PM, John Roesler  wrote:
>
> > Thanks Ted,
> >
> > Sure thing; I updated the example code in the KIP with a little snippet.
> >
> > -John
> >
> > On Wed, Mar 7, 2018 at 7:18 PM, Ted Yu  wrote:
> >
> >> Looks good.
> >>
> >> See if you can add punctuator into the sample code.
> >>
> >> On Wed, Mar 7, 2018 at 7:10 PM, John Roesler  wrote:
> >>
> >> > Dear Kafka community,
> >> >
> >> > I am proposing KIP-267 to augment the public Streams test utils API.
> >> > The goal is to simplify testing of Kafka Streams applications.
> >> >
> >> > Please find details in the
> >> > wiki:https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> > 267%3A+Add+Processor+Unit+Test+Support+to+Kafka+Streams+Test+Utils
> >> >
> >> > An initial WIP PR can be found here:https://github.com/
> >> > apache/kafka/pull/4662
> >> >
> >> > I also included the user-list (please hit "reply-all" to include both
> >> > lists in this KIP discussion).
> >> >
> >> > Thanks,
> >> >
> >> > -John
> >> >
> >>
> >
> >
>



-- 
-- Guozhang


Re: [DISCUSS] KIP-258: Allow to Store Record Timestamps in RocksDB

2018-03-08 Thread Guozhang Wang
Hello Matthias, thanks for the KIP.

I've read through the upgrade patch section and it looks good to me, if you
already have a WIP PR for it could you also share it here so that people
can take a look?

I'm +1 on the KIP itself. But large KIPs like this there are always some
devil hidden in the details, so I think it is better to have the
implementation in parallel along with the design discussion :)


Guozhang


On Wed, Mar 7, 2018 at 2:12 PM, Matthias J. Sax 
wrote:

> Hi,
>
> I want to propose KIP-258 for the Streams API to allow storing
> timestamps in RocksDB. This feature is the basis to resolve multiple
> tickets (issues and feature requests).
>
> Looking forward to your comments about this!
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
>
>
> -Matthias
>
>
>


-- 
-- Guozhang


[jira] [Created] (KAFKA-6628) RocksDBSegmentedBytesStoreTest does not cover time window serdes

2018-03-08 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-6628:


 Summary: RocksDBSegmentedBytesStoreTest does not cover time window 
serdes
 Key: KAFKA-6628
 URL: https://issues.apache.org/jira/browse/KAFKA-6628
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


The RocksDBSegmentedBytesStoreTest.java only covers session window serdes, but 
not time window serdes. We should fill in this coverage gap.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6630) Speed up the processing of StopReplicaResponse events on the controller

2018-03-08 Thread Lucas Wang (JIRA)
Lucas Wang created KAFKA-6630:
-

 Summary: Speed up the processing of StopReplicaResponse events on 
the controller
 Key: KAFKA-6630
 URL: https://issues.apache.org/jira/browse/KAFKA-6630
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Lucas Wang
Assignee: Lucas Wang


Problem Statement:
We find in a large cluster with many partition replicas, it takes a long time 
to successfully delete a topic. 

Root cause:
Further analysis shows that for a topic with N replicas, the controller 
receives all the N StopReplicaResponses from brokers within a short time, 
however sequentially handling all the N 
TopicDeletionStopReplicaResponseReceived events one by one takes a long time.

Specifically the functions triggered while handling every single 
TopicDeletionStopReplicaResponseReceived event include:
TopicDeletionStopReplicaResponseReceived.process calls 
TopicDeletionManager.completeReplicaDeletion, which calls 
TopicDeletionManager.resumeDeletions, which calls several inefficient functions.

The inefficient functions called inside TopicDeletionManager.resumeDeletions 
include
ReplicaStateMachine.areAllReplicasForTopicDeleted
ReplicaStateMachine.isAtLeastOneReplicaInDeletionStartedState
ReplicaStateMachine.replicasInState

Each of the 3 inefficient functions above will iterate through all the replicas 
in the cluster, and filter out the replicas belonging to a topic. In a large 
cluster with many replicas, these functions can be quite slow. 

Total deletion time for a topic becomes long in single threaded controller 
processing model:
Since the controller needs to sequentially process the queued 
TopicDeletionStopReplicaResponseReceived events, if the time cost to process 
one event is t, the total time to process all events for all replicas of a 
topic is N * t.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6629) SegmentedCacheFunctionTest does not cover session window serdes

2018-03-08 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-6629:


 Summary: SegmentedCacheFunctionTest does not cover session window 
serdes
 Key: KAFKA-6629
 URL: https://issues.apache.org/jira/browse/KAFKA-6629
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


The SegmentedCacheFunctionTest.java only covers time window serdes, but not 
session window serdes. We should fill in this coverage gap.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [DISCUSS] KIP-258: Allow to Store Record Timestamps in RocksDB

2018-03-08 Thread John Roesler
Hey Matthias,

The KIP looks good to me. I had several questions queued up, but they were
all in the "rejected alternatives" section... oh, well.

One very minor thought re changing the state directory from "//<
application.id>//rocksdb/storeName/" to "//<
application.id>//rocksdb-v2/storeName/": if you put the "v2"
marker on the storeName part of the path (i.e., "//<
application.id>//rocksdb/storeName-v2/"), then you get the same
benefits without altering the high-level directory structure.

It may not matter, but I could imagine people running scripts to monitor
rocksdb disk usage for each task, or other such use cases.

Thanks,
-John

On Thu, Mar 8, 2018 at 2:02 PM, Ted Yu  wrote:

> Matthias:
> Nicely written KIP.
>
> "in_place" : can this be "in-place" ? Underscore may sometimes be miss
> typed (as '-'). I think using '-' is more friendly to user.
>
> public interface ReadOnlyKeyValueTimestampStore {
>
> Is ReadOnlyKeyValueStoreWithTimestamp better name for the class ?
>
> Thanks
>
> On Thu, Mar 8, 2018 at 1:29 PM, Guozhang Wang  wrote:
>
> > Hello Matthias, thanks for the KIP.
> >
> > I've read through the upgrade patch section and it looks good to me, if
> you
> > already have a WIP PR for it could you also share it here so that people
> > can take a look?
> >
> > I'm +1 on the KIP itself. But large KIPs like this there are always some
> > devil hidden in the details, so I think it is better to have the
> > implementation in parallel along with the design discussion :)
> >
> >
> > Guozhang
> >
> >
> > On Wed, Mar 7, 2018 at 2:12 PM, Matthias J. Sax 
> > wrote:
> >
> > > Hi,
> > >
> > > I want to propose KIP-258 for the Streams API to allow storing
> > > timestamps in RocksDB. This feature is the basis to resolve multiple
> > > tickets (issues and feature requests).
> > >
> > > Looking forward to your comments about this!
> > >
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
> > >
> > >
> > > -Matthias
> > >
> > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


[jira] [Created] (KAFKA-6631) Kafka Streams - Rebalancing exception in Kafka 1.0.0

2018-03-08 Thread Alexander Ivanichev (JIRA)
Alexander Ivanichev created KAFKA-6631:
--

 Summary: Kafka Streams - Rebalancing exception in Kafka 1.0.0
 Key: KAFKA-6631
 URL: https://issues.apache.org/jira/browse/KAFKA-6631
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.0.0
 Environment: Container Linux by CoreOS 1576.5.0
Reporter: Alexander Ivanichev


 
In Kafka Streams 1.0.0, we saw a strange rebalance error, our stream app 
performs window based aggregations, sometimes on start when all stream workers  
join the app just crash, however if we enable only one worker than it works 
fine, sometime 2 workers work just fine, but when third join the app crashes 
again, some critical issue with rebalance.
{code:java}
018-03-08T18:51:01.226243000Z org.apache.kafka.common.KafkaException: 
Unexpected error from SyncGroup: The server experienced an unexpected error 
when processing the request
2018-03-08T18:51:01.226557000Z at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:566)
2018-03-08T18:51:01.22686Z at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:539)
2018-03-08T18:51:01.227328000Z at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
2018-03-08T18:51:01.22763Z at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
2018-03-08T18:51:01.228152000Z at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
2018-03-08T18:51:01.228449000Z at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
2018-03-08T18:51:01.228897000Z at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
2018-03-08T18:51:01.229196000Z at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:506)
2018-03-08T18:51:01.229673000Z at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
2018-03-08T18:51:01.229971000Z at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:268)
2018-03-08T18:51:01.230436000Z at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
2018-03-08T18:51:01.230749000Z at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:174)
2018-03-08T18:51:01.231065000Z at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364)
2018-03-08T18:51:01.231584000Z at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
2018-03-08T18:51:01.231911000Z at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
2018-03-08T18:51:01.23219Z at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1138)
2018-03-08T18:51:01.232643000Z at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
2018-03-08T18:51:01.233121000Z at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851)
2018-03-08T18:51:01.233409000Z at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
2018-03-08T18:51:01.23372Z at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
2018-03-08T18:51:01.234196000Z at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
2018-03-08T18:51:01.234655000Z org.apache.kafka.common.KafkaException: 
Unexpected error from SyncGroup: The server experienced an unexpected error 
when processing the request
2018-03-08T18:51:01.234972000Z exception in thread, closing process
2018-03-08T18:51:01.23550Z at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:566)
2018-03-08T18:51:01.235839000Z at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:539)
2018-03-08T18:51:01.236336000Z at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
2018-03-08T18:51:01.236603000Z at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
2018-03-08T18:51:01.236889000Z at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)

Re: [VOTE] KIP-265: Make Windowed Serde to public APIs

2018-03-08 Thread Guozhang Wang
+1 from myself as well.


I'm closing the vote for KIP-265 with the following results:

binding +1: Matthias, Damian, Guozhang
non-binding +1: Xi, Ted, Bill


Thanks everyone for the votes.


Guozhang


On Thu, Mar 8, 2018 at 11:18 AM, Damian Guy  wrote:

> Thanks Guozhang +1
>
> On Tue, 6 Mar 2018 at 00:26 Hu Xi  wrote:
>
> > +1 (non-binding)
> >
> > 
> > 发件人: Matthias J. Sax 
> > 发送时间: 2018年3月6日 8:19
> > 收件人: dev@kafka.apache.org
> > 主题: Re: [VOTE] KIP-265: Make Windowed Serde to public APIs
> >
> > +1 (binding)
> >
> > Thanks for the KIP Guozhang!
> >
> > -Matthias
> >
> > On 3/5/18 2:41 PM, Bill Bejeck wrote:
> > > Thanks for the KIP Guozhang.
> > >
> > > +1
> > >
> > > -Bill
> > >
> > > On Mon, Mar 5, 2018 at 5:11 PM, Ted Yu  wrote:
> > >
> > >> +1
> > >>
> > >> On Mon, Mar 5, 2018 at 1:46 PM, Guozhang Wang 
> > wrote:
> > >>
> > >>> Hello all,
> > >>>
> > >>> I'd like to start voting on KIP-265, on making windowed serde to
> public
> > >>> APIs of Kafka Streams. It involves a couple of new configs, plus a
> few
> > >> new
> > >>> public classes for windowed serializer and deserializer, and also
> > adding
> > >>> the corresponding console consumer options in order to fetch from a
> > topic
> > >>> written by a windowed store.
> > >>>
> > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >>> 265%3A+Make+Windowed+Serde+to+public+APIs
> > >>>
> > >>>
> > >>>
> > >>> Thanks,
> > >>> -- Guozhang
> > >>>
> > >>
> > >
> >
> >
>



-- 
-- Guozhang


Re: [DISCUSS] KIP-267: Add Processor Unit Test Support to Kafka Streams Test Utils

2018-03-08 Thread John Roesler
Thanks, Matthias,

1. I can move it into the o.a.k.streams.processor package; that makes sense.

2. I'm expecting most users to use in-memory state stores, so they won't
need a state directory. In the "real" code path, the stateDir is extracted
from the config
by org.apache.kafka.streams.processor.internals.StateDirectory. The logic
is non-trivial and invoking it directly will result in the state directory
actually being created. Given my assumption that you don't need it most of
the time, creating directories seems too heavy to me.

3. I'm on the fence about that. It's not too much trouble to implement it,
even if it is deprecated from day 1, so I think I'd rather put it in and
let us remove it later when we actually remove the deprecated method. In
contrast, we actually would have to jump through some hoops to support
schedule(interval).

On Thu, Mar 8, 2018 at 3:36 PM, Matthias J. Sax 
wrote:

> Thanks for the KIP John.
>
> Couple of minor questions:
>
> - What about putting the mock into sub-package `processor` so it's in
> the same package name as the interface it implements?
>
> - What is the purpose of the constructor talking the `File stateDir`
> argument? The state directory should be encoded in the `Properties
> config' parameter already.
>
> - We have KIP-251 that place (not voted yet though) that plans to
> deprecate `forward(K key, V value, int childIndex)` and `forward(K key,
> V value, String childName)`  -- should we also throw
> UnsupportedOperationException similar to `schedule(long)` if KIP-251 is
> accepted?
>
>
> -Matthias
>
> On 3/8/18 3:16 PM, John Roesler wrote:
> > Thanks for the review, Guozhang,
> >
> > In response:
> > 1. I missed that! I'll look into it and update the KIP.
> >
> > 2. I was planning to use the real implementation, since folks might
> > register some metrics in the processors and want to verify the values
> that
> > get recorded. If the concern is about initializing all the stuff that's
> in
> > the Metrics object, I can instantiate it lazily or even make it optional
> by
> > taking a nullable constructor parameter.
> >
> > 3. Agreed. I think that's the real sharp edge here. I actually think it
> > would be neat to auto-trigger those scheduled punctuators, but it seems
> > like that moves this component out of "mock" territory and into "driver"
> > territory. Since we already have the TopologyTestDriver, I'd prefer to
> > focus on keeping the mock lean. I agree it should be in the javadoc as
> well
> > as the web documentation.
> >
> > Thanks,
> > -John
> >
> > On Thu, Mar 8, 2018 at 1:46 PM, Guozhang Wang 
> wrote:
> >
> >> Hello John,
> >>
> >> Thanks for the KIP. I made a pass over the wiki page and here are some
> >> comments:
> >>
> >> 1. Meta-comment: there is an internal class MockProcessorContext under
> the
> >> o.a.k.test package, which should be replaced as part of this KIP.
> >>
> >> 2. In @Override StreamsMetrics metrics(), will you return a fully
> created
> >> StreamsMetricsImpl object or are you planning to use the
> >> MockStreamsMetrics? Note that for the latter case you probably need to
> look
> >> into https://issues.apache.org/jira/browse/KAFKA-5676 as well.
> >>
> >> 3. Not related to the KIP changes themselves: about
> >> "context.scheduledPunctuators": we need to well document that in the
> >> MockProcessorContext the scheduled punctuator will never by
> auto-triggered,
> >> and hence it is only for testing people's code that some punctuators are
> >> indeed registered, and if people want full auto punctuation testing they
> >> have to go with TopologyTestDriver.
> >>
> >>
> >>
> >> Guozhang
> >>
> >>
> >> On Wed, Mar 7, 2018 at 8:04 PM, John Roesler  wrote:
> >>
> >>> On Wed, Mar 7, 2018 at 8:03 PM, John Roesler 
> wrote:
> >>>
>  Thanks Ted,
> 
>  Sure thing; I updated the example code in the KIP with a little
> >> snippet.
> 
>  -John
> 
>  On Wed, Mar 7, 2018 at 7:18 PM, Ted Yu  wrote:
> 
> > Looks good.
> >
> > See if you can add punctuator into the sample code.
> >
> > On Wed, Mar 7, 2018 at 7:10 PM, John Roesler 
> >> wrote:
> >
> >> Dear Kafka community,
> >>
> >> I am proposing KIP-267 to augment the public Streams test utils API.
> >> The goal is to simplify testing of Kafka Streams applications.
> >>
> >> Please find details in the
> >> wiki:https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> 267%3A+Add+Processor+Unit+Test+Support+to+Kafka+Streams+Test+Utils
> >>
> >> An initial WIP PR can be found here:https://github.com/
> >> apache/kafka/pull/4662
> >>
> >> I also included the user-list (please hit "reply-all" to include
> >> both
> >> lists in this KIP discussion).
> >>
> >> Thanks,
> >>
> >> -John
> >>
> >
> 
> 
> >>>
> >>
> >>
> >>
> >> --
> >> -- 

Re: [VOTE] KIP-251: Allow timestamp manipulation in Processor API

2018-03-08 Thread Matthias J. Sax
Guozhang,

I updated the code slightly to avoid object creation and I did some perf
investigations.

1) JMH Benchmark with the below topology using TopologyTestDriver to
pipe data throw the topology:

> StreamsBuilder builder = new StreamsBuilder();
> KStream stream = builder.stream("topic").transform(new 
> TransformerSupplier>() {
> @Override
> public Transformer> 
> get() {
> return new Transformer Object>>() {
> ProcessorContext context;
> 
> @Override
> public void init(ProcessorContext context) {
> this.context = context;
> }
> 
> @Override
> public KeyValue transform(Object key, 
> Object value) {
> context.forward(key, value);
> return null;
> }
> 
> @Override
> public KeyValue punctuate(long timestamp) 
> {
> return null;
> }
> 
> @Override
> public void close() {}
> };
> }
> });

I run this with zero, one, and five downstream nodes like:

> stream.foreach(new ForeachAction() {
> @Override
> public void apply(Object key, Object value) {}
> });

On `trunk` I get the following numbers (5 warmup iterations, 15 test
iterations)

Zero Downstream Nodes:

> # Run complete. Total time: 00:00:20
> 
> Benchmark Mode  CntScore   Error  Units
> PapiBenchmark.runTestDriver  thrpt   15  2246686.693 ± 56372.920  ops/s

One Downstream Node:

> # Run complete. Total time: 00:00:20
> 
> Benchmark Mode  CntScore   Error  Units
> PapiBenchmark.runTestDriver  thrpt   15  2206277.298 ± 51855.465  ops/s

Five Downstream Nodes:

> # Run complete. Total time: 00:00:20
> 
> Benchmark Mode  CntScore   Error  Units
> PapiBenchmark.runTestDriver  thrpt   15  1855833.516 ± 46901.811  ops/s


Repeating the same on my PR branch I get the following numbers:

Zero Downstream Nodes:

> # Run complete. Total time: 00:00:20
> 
> Benchmark Mode  CntScore   Error  Units
> PapiBenchmark.runTestDriver  thrpt   15  2192891.762 ± 77598.908  ops/s

One Downstream Node:

> # Run complete. Total time: 00:00:20
> 
> Benchmark Mode  CntScore   Error  Units
> PapiBenchmark.runTestDriver  thrpt   15  2190676.716 ± 77030.594  ops/s

Five Downstream Nodes:

> # Run complete. Total time: 00:00:20
> 
> Benchmark Mode  CntScore   Error  Units
> PapiBenchmark.runTestDriver  thrpt   15  1921632.144 ± 66276.232  ops/s


I also had a look in GC and did not observe an issues. The objects that
get created are all in young gen and thus cleaning them up is cheap.

Let me know if this addresses your concerns.


-Matthias




On 2/11/18 9:36 PM, Guozhang Wang wrote:
> Hi Matthias,
> 
> Just clarifying a meta question along side with my vote: we still need to
> understand the overhead of the `To` objects during run time to determine
> whether we would use it in the final proposal or using overloading
> functions. Right?
> 
> 
> Guozhang
> 
> On Sun, Feb 11, 2018 at 9:33 PM, Guozhang Wang  wrote:
> 
>> +1
>>
>> On Fri, Feb 9, 2018 at 5:31 AM, Bill Bejeck  wrote:
>>
>>> Thanks for the KIP, +1 for me.
>>>
>>> -Bill
>>>
>>> On Fri, Feb 9, 2018 at 6:45 AM, Damian Guy  wrote:
>>>
 Thanks Matthias, +1

 On Fri, 9 Feb 2018 at 02:42 Ted Yu  wrote:

> +1
>  Original message From: "Matthias J. Sax" <
> matth...@confluent.io> Date: 2/8/18  6:05 PM  (GMT-08:00) To:
> dev@kafka.apache.org Subject: [VOTE] KIP-251: Allow timestamp
> manipulation in Processor API
> Hi,
>
> I want to start the vote for KIP-251:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
 251%3A+Allow+timestamp+manipulation+in+Processor+API
>
>
> -Matthias
>
>

>>>
>>
>>
>>
>> --
>> -- Guozhang
>>
> 
> 
> 



signature.asc
Description: OpenPGP digital signature


Re: [DISCUSS] KIP-267: Add Processor Unit Test Support to Kafka Streams Test Utils

2018-03-08 Thread John Roesler
I think what you're suggesting is to:
1. compile the main streams code, but not the tests
2. compile test-utils (and compile and run the test-utils tests)
3. compile and run the streams tests

This works in theory, since the test-utils depends on the main streams
code, but not the streams tests. and the streams tests depend on test-utils
while the main streams code does not.

But after poking around a bit and reading up on it, I think this is not
possible, or at least not mainstream.

The issue is that dependencies are formed between projects, in this case
streams and streams:test-utils. The upstream project must be built before
the dependant one, regardless of whether the dependency is for compiling
the main code or the test code. This means we do have a circular dependency
on our hands if we want the tests in streams to use the test-utils, since
they'd both have to be built before the other.

Gradle seems to be quite scriptable, so there may be some way to achieve
this, but increasing the complexity of the build also introduces a project
maintenance concern.

The MockProcessorContext itself is pretty simple, so I'm tempted to argue
that we should just have one for internal unit tests and another for
test-utils, however this situation also afflicts KAFKA-6474
, and the
TopologyTestDriver is not so trivial.

I think the best thing at this point is to go ahead and fold the test-utils
into the streams project. We can put it into a separate "testutils" package
to make it easy to identify which code is for test support and which code
is Kafka Streams. The biggest bummer about this suggestion is that it we
*just* introduced the test-utils artifact, so folks would to add that
artifact in 1.1 to write their tests and then have to drop it again in 1.2.

The other major solution is to create a new gradle project for the streams
unit tests, which depends on streams and test-utils and move all the
streams unit tests there. I'm pretty sure we can configure gradle just to
include this project for running tests and not actually package any
artifacts. This structure basically expresses your observation that the
test code is essentially a separate module from the main streams code.

Of course, I'm open to alternatives, especially if someone with more
experience in Gradle is aware of a solution.

Thanks,
-John


On Thu, Mar 8, 2018 at 3:39 PM, Matthias J. Sax 
wrote:

> Isn't MockProcessorContext in o.a.k.test part of the unit-test package
> but not the main package?
>
> This should resolve the dependency issue.
>
> -Matthias
>
> On 3/8/18 3:32 PM, John Roesler wrote:
> > Actually, replacing the MockProcessorContext in o.a.k.test could be a bit
> > tricky, since it would make the "streams" module depend on
> > "streams:test-utils", but "streams:test-utils" already depends on
> "streams".
> >
> > At first glance, it seems like the options are:
> > 1. leave the two separate implementations in place. This shouldn't be
> > underestimated, especially since our internal tests may need different
> > things from a mocked P.C. than our API users.
> > 2. move the public testing artifacts into the regular streams module
> > 3. move the unit tests for Streams into a third module that depends on
> both
> > streams and test-utils. Yuck!
> >
> > Thanks,
> > -John
> >
> > On Thu, Mar 8, 2018 at 3:16 PM, John Roesler  wrote:
> >
> >> Thanks for the review, Guozhang,
> >>
> >> In response:
> >> 1. I missed that! I'll look into it and update the KIP.
> >>
> >> 2. I was planning to use the real implementation, since folks might
> >> register some metrics in the processors and want to verify the values
> that
> >> get recorded. If the concern is about initializing all the stuff that's
> in
> >> the Metrics object, I can instantiate it lazily or even make it
> optional by
> >> taking a nullable constructor parameter.
> >>
> >> 3. Agreed. I think that's the real sharp edge here. I actually think it
> >> would be neat to auto-trigger those scheduled punctuators, but it seems
> >> like that moves this component out of "mock" territory and into "driver"
> >> territory. Since we already have the TopologyTestDriver, I'd prefer to
> >> focus on keeping the mock lean. I agree it should be in the javadoc as
> well
> >> as the web documentation.
> >>
> >> Thanks,
> >> -John
> >>
> >> On Thu, Mar 8, 2018 at 1:46 PM, Guozhang Wang 
> wrote:
> >>
> >>> Hello John,
> >>>
> >>> Thanks for the KIP. I made a pass over the wiki page and here are some
> >>> comments:
> >>>
> >>> 1. Meta-comment: there is an internal class MockProcessorContext under
> the
> >>> o.a.k.test package, which should be replaced as part of this KIP.
> >>>
> >>> 2. In @Override StreamsMetrics metrics(), will you return a fully
> created
> >>> StreamsMetricsImpl object or are you planning to use the
> >>> MockStreamsMetrics? Note that for the latter case you probably need to
> 

Re: Request to be added to the contributor list so that i can assign JIRAs to myself

2018-03-08 Thread Matthias J. Sax
What is your user ID?

-Matthias

On 3/8/18 8:11 PM, Sanket Band wrote:
> Thanks
> Sanket
> 



signature.asc
Description: OpenPGP digital signature


Re: Request to be added to the contributor list so that i can assign JIRAs to myself

2018-03-08 Thread Matthias J. Sax
I meant your JIRA ID -- sorry for the confusion.


-Matthias

On 3/8/18 10:48 PM, Sanket Band wrote:
> you mean the github id ? it is sband
> 
> Thanks
> Sanket Band
> 
> On Fri, Mar 9, 2018 at 11:22 AM, Matthias J. Sax 
> wrote:
> 
>> What is your user ID?
>>
>> -Matthias
>>
>> On 3/8/18 8:11 PM, Sanket Band wrote:
>>> Thanks
>>> Sanket
>>>
>>
>>
> 



signature.asc
Description: OpenPGP digital signature


Build failed in Jenkins: kafka-trunk-jdk8 #2458

2018-03-08 Thread Apache Jenkins Server
See 


Changes:

[jason] MINOR: Fix incorrect references to the max transaction timeout config

[wangguoz] KAFKA-6560: [FOLLOW-UP] don't deserialize null byte array in window

--
[...truncated 3.51 MB...]
kafka.server.KafkaApisTest > 
shouldThrowUnsupportedVersionExceptionOnHandleAddOffsetToTxnRequestWhenInterBrokerProtocolNotSupported
 PASSED

kafka.server.KafkaApisTest > 
shouldRespondWithUnsupportedMessageFormatForBadPartitionAndNoErrorsForGoodPartition
 STARTED

kafka.server.KafkaApisTest > 
shouldRespondWithUnsupportedMessageFormatForBadPartitionAndNoErrorsForGoodPartition
 PASSED

kafka.server.KafkaApisTest > 
shouldThrowUnsupportedVersionExceptionOnHandleEndTxnRequestWhenInterBrokerProtocolNotSupported
 STARTED

kafka.server.KafkaApisTest > 
shouldThrowUnsupportedVersionExceptionOnHandleEndTxnRequestWhenInterBrokerProtocolNotSupported
 PASSED

kafka.server.KafkaApisTest > 
shouldRespondWithUnknownTopicOrPartitionForBadPartitionAndNoErrorsForGoodPartition
 STARTED

kafka.server.KafkaApisTest > 
shouldRespondWithUnknownTopicOrPartitionForBadPartitionAndNoErrorsForGoodPartition
 PASSED

kafka.server.DelegationTokenRequestsTest > testDelegationTokenRequests STARTED

kafka.server.DelegationTokenRequestsTest > testDelegationTokenRequests PASSED

kafka.server.ReplicaFetcherThreadFatalErrorTest > 
testFatalErrorInProcessFetchRequest STARTED

kafka.server.ReplicaFetcherThreadFatalErrorTest > 
testFatalErrorInProcessFetchRequest PASSED

kafka.server.ReplicaFetcherThreadFatalErrorTest > testFatalErrorInAddPartitions 
STARTED

kafka.server.ReplicaFetcherThreadFatalErrorTest > testFatalErrorInAddPartitions 
PASSED

kafka.server.ThrottledResponseExpirationTest > testThrottledRequest STARTED

kafka.server.ThrottledResponseExpirationTest > testThrottledRequest PASSED

kafka.server.ThrottledResponseExpirationTest > testExpire STARTED

kafka.server.ThrottledResponseExpirationTest > testExpire PASSED

kafka.server.ReplicationQuotaManagerTest > shouldThrottleOnlyDefinedReplicas 
STARTED

kafka.server.ReplicationQuotaManagerTest > shouldThrottleOnlyDefinedReplicas 
PASSED

kafka.server.ReplicationQuotaManagerTest > 
shouldSupportWildcardThrottledReplicas STARTED

kafka.server.ReplicationQuotaManagerTest > 
shouldSupportWildcardThrottledReplicas PASSED

kafka.server.ReplicationQuotaManagerTest > 
shouldExceedQuotaThenReturnBackBelowBoundAsTimePasses STARTED

kafka.server.ReplicationQuotaManagerTest > 
shouldExceedQuotaThenReturnBackBelowBoundAsTimePasses PASSED

kafka.server.AddPartitionsToTxnRequestTest > 
shouldReceiveOperationNotAttemptedWhenOtherPartitionHasError STARTED

kafka.server.AddPartitionsToTxnRequestTest > 
shouldReceiveOperationNotAttemptedWhenOtherPartitionHasError PASSED

kafka.server.DelayedOperationTest > testRequestPurge STARTED

kafka.server.DelayedOperationTest > testRequestPurge PASSED

kafka.server.DelayedOperationTest > testRequestExpiry STARTED

kafka.server.DelayedOperationTest > testRequestExpiry PASSED

kafka.server.DelayedOperationTest > 
shouldReturnNilOperationsOnCancelForKeyWhenKeyDoesntExist STARTED

kafka.server.DelayedOperationTest > 
shouldReturnNilOperationsOnCancelForKeyWhenKeyDoesntExist PASSED

kafka.server.DelayedOperationTest > testDelayedOperationLockOverride STARTED

kafka.server.DelayedOperationTest > testDelayedOperationLockOverride PASSED

kafka.server.DelayedOperationTest > 
shouldCancelForKeyReturningCancelledOperations STARTED

kafka.server.DelayedOperationTest > 
shouldCancelForKeyReturningCancelledOperations PASSED

kafka.server.DelayedOperationTest > testRequestSatisfaction STARTED

kafka.server.DelayedOperationTest > testRequestSatisfaction PASSED

kafka.server.DelayedOperationTest > testDelayedOperationLock STARTED

kafka.server.DelayedOperationTest > testDelayedOperationLock PASSED

kafka.zookeeper.ZooKeeperClientTest > testZNodeChangeHandlerForDataChange 
STARTED

kafka.zookeeper.ZooKeeperClientTest > testZNodeChangeHandlerForDataChange PASSED

kafka.zookeeper.ZooKeeperClientTest > testZooKeeperSessionStateMetric STARTED

kafka.zookeeper.ZooKeeperClientTest > testZooKeeperSessionStateMetric PASSED

kafka.zookeeper.ZooKeeperClientTest > testGetChildrenExistingZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testGetChildrenExistingZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testConnection STARTED

kafka.zookeeper.ZooKeeperClientTest > testConnection PASSED

kafka.zookeeper.ZooKeeperClientTest > testZNodeChangeHandlerForCreation STARTED

kafka.zookeeper.ZooKeeperClientTest > testZNodeChangeHandlerForCreation PASSED

kafka.zookeeper.ZooKeeperClientTest > testGetAclExistingZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testGetAclExistingZNode PASSED

kafka.zookeeper.ZooKeeperClientTest > testSetAclNonExistentZNode STARTED

kafka.zookeeper.ZooKeeperClientTest > testSetAclNonExistentZNode PASSED

kafka.zookeeper.ZooKeeperClientTest 

Request to be added to the contributor list so that i can assign JIRAs to myself

2018-03-08 Thread Sanket Band
Thanks
Sanket


Re: [DISCUSS] KIP-253: Support in-order message delivery with partition expansion

2018-03-08 Thread Matthias J. Sax
Jun,

There is one more case: non-windowed aggregations. For windowed
aggregation, the changelog topic will be compact+delete. However, for
non-windowed aggregation the policy is compact only.

Even if we assume that windowed aggregations are dominant and
non-windowed aggregation are used rarely, it seems to be bad to not
support the feature is a non-windowed aggregation is used. Also,
non-windowed aggregation volume depends on input-stream volume that
might be high.

Furthermore, we support stream-table join and this requires that the
stream and the table are co-partitioned. Thus, even if the table would
have lower volume but the stream must be scaled out, we also need to
scale out the table to preserve co-partitioning.


-Matthias

On 3/8/18 6:44 PM, Jun Rao wrote:
> Hi, Matthis,
> 
> My understanding is that in KStream, the only case when a changelog topic
> needs to be compacted is when the corresponding input is a KTable. In all
> other cases, the changelog topics are of compacted + deletion. So, if most
> KTables are not high volume, there may not be a need to expand its
> partitions and therefore the partitions of the corresponding changelog
> topic.
> 
> Thanks,
> 
> Jun
> 
> On Wed, Mar 7, 2018 at 2:34 PM, Matthias J. Sax 
> wrote:
> 
>> Jun,
>>
>> thanks for your comment. This should actually work for Streams, because
>> we don't rely on producer "hashing" but specify the partition number
>> explicitly on send().
>>
>> About not allowing to change the number of partition for changelog
>> topics: for Streams, this seems to imply that we need to create a second
>> changelog topic for each store with the new partition count. However, it
>> would be unclear when/if we can delete the old topic. Thus, it moves the
>> "problem" into the application layer. It's hard to judge for me atm what
>> the impact would be, but it's something we should pay attention to.
>>
>>
>> -Matthias
>>
>> On 3/6/18 3:45 PM, Jun Rao wrote:
>>> Hi, Mattias,
>>>
>>> Regarding your comment "If it would be time-delay based, it might be
>>> problematic
>>> for Kafka Streams: if we get the information that the new input
>> partitions
>>> are available for producing, we need to enable the new changelog
>> partitions
>>> for producing, too. If those would not be available yet, because the
>>> time-delay did not trigger yet, it would be problematic to avoid
>>> crashing.", could you just enable the changelog topic to write to its new
>>> partitions immediately?  The input topic can be configured with a delay
>> in
>>> writing to the new partitions. Initially, there won't be new data
>> produced
>>> into the newly added partitions in the input topic. However, we could
>>> prebuild the state for the new input partition and write the state
>> changes
>>> to the corresponding new partitions in the changelog topic.
>>>
>>> Hi, Jan,
>>>
>>> For a compacted topic, garbage collecting the old keys in the existing
>>> partitions after partition expansion can be tricky as your pointed out. A
>>> few options here. (a) Let brokers exchange keys across brokers during
>>> compaction. This will add complexity on the broker side. (b) Build an
>>> external tool that scans the compacted topic and drop the prefix of a
>>> partition if all records in the prefix are removable. The admin can then
>>> run this tool when the unneeded space needs to be reclaimed. (c) Don't
>>> support partition change in a compacted topic. This might be ok since
>> most
>>> compacted topics are not high volume.
>>>
>>> Thanks,
>>>
>>> Jun
>>>
>>>
>>> On Tue, Mar 6, 2018 at 10:38 AM, Dong Lin  wrote:
>>>
 Hi everyone,

 Thanks for all the comments! It appears that everyone prefers linear
 hashing because it reduces the amount of state that needs to be moved
 between consumers (for stream processing). The KIP has been updated to
>> use
 linear hashing.

 Regarding the migration endeavor: it seems that migrating producer
>> library
 to use linear hashing should be pretty straightforward without
 much operational endeavor. If we don't upgrade client library to use
>> this
 KIP, we can not support in-order delivery after partition is changed
 anyway. Suppose we upgrade client library to use this KIP, if partition
 number is not changed, the key -> partition mapping will be exactly the
 same as it is now because it is still determined using murmur_hash(key)
>> %
 original_partition_num. In other words, this change is backward
>> compatible.

 Regarding the load distribution: if we use linear hashing, the load may
>> be
 unevenly distributed because those partitions which are not split may
 receive twice as much traffic as other partitions that are split. This
 issue can be mitigated by creating topic with partitions that are
>> several
 times the number of consumers. And there will be no imbalance if the
 partition number is always doubled. So this 

Re: [DISCUSS] KIP-253: Support in-order message delivery with partition expansion

2018-03-08 Thread Matthias J. Sax
As I just mentioned joins:

For Kafka Streams it might also be required to change the partition
count for multiple topics in a coordinated way that allows to maintain
the co-partitioning property that Kafka Streams uses to computed joins.

Any thoughts how this could be handled?


-Matthias

On 3/8/18 10:08 PM, Matthias J. Sax wrote:
> Jun,
> 
> There is one more case: non-windowed aggregations. For windowed
> aggregation, the changelog topic will be compact+delete. However, for
> non-windowed aggregation the policy is compact only.
> 
> Even if we assume that windowed aggregations are dominant and
> non-windowed aggregation are used rarely, it seems to be bad to not
> support the feature is a non-windowed aggregation is used. Also,
> non-windowed aggregation volume depends on input-stream volume that
> might be high.
> 
> Furthermore, we support stream-table join and this requires that the
> stream and the table are co-partitioned. Thus, even if the table would
> have lower volume but the stream must be scaled out, we also need to
> scale out the table to preserve co-partitioning.
> 
> 
> -Matthias
> 
> On 3/8/18 6:44 PM, Jun Rao wrote:
>> Hi, Matthis,
>>
>> My understanding is that in KStream, the only case when a changelog topic
>> needs to be compacted is when the corresponding input is a KTable. In all
>> other cases, the changelog topics are of compacted + deletion. So, if most
>> KTables are not high volume, there may not be a need to expand its
>> partitions and therefore the partitions of the corresponding changelog
>> topic.
>>
>> Thanks,
>>
>> Jun
>>
>> On Wed, Mar 7, 2018 at 2:34 PM, Matthias J. Sax 
>> wrote:
>>
>>> Jun,
>>>
>>> thanks for your comment. This should actually work for Streams, because
>>> we don't rely on producer "hashing" but specify the partition number
>>> explicitly on send().
>>>
>>> About not allowing to change the number of partition for changelog
>>> topics: for Streams, this seems to imply that we need to create a second
>>> changelog topic for each store with the new partition count. However, it
>>> would be unclear when/if we can delete the old topic. Thus, it moves the
>>> "problem" into the application layer. It's hard to judge for me atm what
>>> the impact would be, but it's something we should pay attention to.
>>>
>>>
>>> -Matthias
>>>
>>> On 3/6/18 3:45 PM, Jun Rao wrote:
 Hi, Mattias,

 Regarding your comment "If it would be time-delay based, it might be
 problematic
 for Kafka Streams: if we get the information that the new input
>>> partitions
 are available for producing, we need to enable the new changelog
>>> partitions
 for producing, too. If those would not be available yet, because the
 time-delay did not trigger yet, it would be problematic to avoid
 crashing.", could you just enable the changelog topic to write to its new
 partitions immediately?  The input topic can be configured with a delay
>>> in
 writing to the new partitions. Initially, there won't be new data
>>> produced
 into the newly added partitions in the input topic. However, we could
 prebuild the state for the new input partition and write the state
>>> changes
 to the corresponding new partitions in the changelog topic.

 Hi, Jan,

 For a compacted topic, garbage collecting the old keys in the existing
 partitions after partition expansion can be tricky as your pointed out. A
 few options here. (a) Let brokers exchange keys across brokers during
 compaction. This will add complexity on the broker side. (b) Build an
 external tool that scans the compacted topic and drop the prefix of a
 partition if all records in the prefix are removable. The admin can then
 run this tool when the unneeded space needs to be reclaimed. (c) Don't
 support partition change in a compacted topic. This might be ok since
>>> most
 compacted topics are not high volume.

 Thanks,

 Jun


 On Tue, Mar 6, 2018 at 10:38 AM, Dong Lin  wrote:

> Hi everyone,
>
> Thanks for all the comments! It appears that everyone prefers linear
> hashing because it reduces the amount of state that needs to be moved
> between consumers (for stream processing). The KIP has been updated to
>>> use
> linear hashing.
>
> Regarding the migration endeavor: it seems that migrating producer
>>> library
> to use linear hashing should be pretty straightforward without
> much operational endeavor. If we don't upgrade client library to use
>>> this
> KIP, we can not support in-order delivery after partition is changed
> anyway. Suppose we upgrade client library to use this KIP, if partition
> number is not changed, the key -> partition mapping will be exactly the
> same as it is now because it is still determined using murmur_hash(key)
>>> %
> original_partition_num. In other words, this change is 

Re: [VOTE] KIP-251: Allow timestamp manipulation in Processor API

2018-03-08 Thread Guozhang Wang
Thanks Matthias, that sounds good to me. I'm +1 on the KIP itself.


Guozhang

On Thu, Mar 8, 2018 at 5:46 PM, Matthias J. Sax 
wrote:

> Guozhang,
>
> I updated the code slightly to avoid object creation and I did some perf
> investigations.
>
> 1) JMH Benchmark with the below topology using TopologyTestDriver to
> pipe data throw the topology:
>
> > StreamsBuilder builder = new StreamsBuilder();
> > KStream stream = builder.stream("topic").transform(new
> TransformerSupplier>() {
> > @Override
> > public Transformer>
> get() {
> > return new Transformer Object>>() {
> > ProcessorContext context;
> >
> > @Override
> > public void init(ProcessorContext context) {
> > this.context = context;
> > }
> >
> > @Override
> > public KeyValue transform(Object
> key, Object value) {
> > context.forward(key, value);
> > return null;
> > }
> >
> > @Override
> > public KeyValue punctuate(long
> timestamp) {
> > return null;
> > }
> >
> > @Override
> > public void close() {}
> > };
> > }
> > });
>
> I run this with zero, one, and five downstream nodes like:
>
> > stream.foreach(new ForeachAction() {
> > @Override
> > public void apply(Object key, Object value) {}
> > });
>
> On `trunk` I get the following numbers (5 warmup iterations, 15 test
> iterations)
>
> Zero Downstream Nodes:
>
> > # Run complete. Total time: 00:00:20
> >
> > Benchmark Mode  CntScore   Error  Units
> > PapiBenchmark.runTestDriver  thrpt   15  2246686.693 ± 56372.920  ops/s
>
> One Downstream Node:
>
> > # Run complete. Total time: 00:00:20
> >
> > Benchmark Mode  CntScore   Error  Units
> > PapiBenchmark.runTestDriver  thrpt   15  2206277.298 ± 51855.465  ops/s
>
> Five Downstream Nodes:
>
> > # Run complete. Total time: 00:00:20
> >
> > Benchmark Mode  CntScore   Error  Units
> > PapiBenchmark.runTestDriver  thrpt   15  1855833.516 ± 46901.811  ops/s
>
>
> Repeating the same on my PR branch I get the following numbers:
>
> Zero Downstream Nodes:
>
> > # Run complete. Total time: 00:00:20
> >
> > Benchmark Mode  CntScore   Error  Units
> > PapiBenchmark.runTestDriver  thrpt   15  2192891.762 ± 77598.908  ops/s
>
> One Downstream Node:
>
> > # Run complete. Total time: 00:00:20
> >
> > Benchmark Mode  CntScore   Error  Units
> > PapiBenchmark.runTestDriver  thrpt   15  2190676.716 ± 77030.594  ops/s
>
> Five Downstream Nodes:
>
> > # Run complete. Total time: 00:00:20
> >
> > Benchmark Mode  CntScore   Error  Units
> > PapiBenchmark.runTestDriver  thrpt   15  1921632.144 ± 66276.232  ops/s
>
>
> I also had a look in GC and did not observe an issues. The objects that
> get created are all in young gen and thus cleaning them up is cheap.
>
> Let me know if this addresses your concerns.
>
>
> -Matthias
>
>
>
>
> On 2/11/18 9:36 PM, Guozhang Wang wrote:
> > Hi Matthias,
> >
> > Just clarifying a meta question along side with my vote: we still need to
> > understand the overhead of the `To` objects during run time to determine
> > whether we would use it in the final proposal or using overloading
> > functions. Right?
> >
> >
> > Guozhang
> >
> > On Sun, Feb 11, 2018 at 9:33 PM, Guozhang Wang 
> wrote:
> >
> >> +1
> >>
> >> On Fri, Feb 9, 2018 at 5:31 AM, Bill Bejeck  wrote:
> >>
> >>> Thanks for the KIP, +1 for me.
> >>>
> >>> -Bill
> >>>
> >>> On Fri, Feb 9, 2018 at 6:45 AM, Damian Guy 
> wrote:
> >>>
>  Thanks Matthias, +1
> 
>  On Fri, 9 Feb 2018 at 02:42 Ted Yu  wrote:
> 
> > +1
> >  Original message From: "Matthias J. Sax" <
> > matth...@confluent.io> Date: 2/8/18  6:05 PM  (GMT-08:00) To:
> > dev@kafka.apache.org Subject: [VOTE] KIP-251: Allow timestamp
> > manipulation in Processor API
> > Hi,
> >
> > I want to start the vote for KIP-251:
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>  251%3A+Allow+timestamp+manipulation+in+Processor+API
> >
> >
> > -Matthias
> >
> >
> 
> >>>
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
> >
> >
>
>


-- 
-- Guozhang


Re: Request to be added to the contributor list so that i can assign JIRAs to myself

2018-03-08 Thread Sanket Band
you mean the github id ? it is sband

Thanks
Sanket Band

On Fri, Mar 9, 2018 at 11:22 AM, Matthias J. Sax 
wrote:

> What is your user ID?
>
> -Matthias
>
> On 3/8/18 8:11 PM, Sanket Band wrote:
> > Thanks
> > Sanket
> >
>
>


Re: [DISCUSS] KIP-253: Support in-order message delivery with partition expansion

2018-03-08 Thread Matthias J. Sax
@Jan: You suggest to copy the data from one topic to a new topic, and
provide an "offset mapping" from the old to the new topic for the
consumers. I don't quite understand how this would work.

Let's say there are 2 partitions in the original topic and 3 partitions
in the new topic. If we assume that we don't use linear hashing as you
suggest, there is no guarantee how data will be distributed in the new
topic and also no guarantee about ordering of records in the new topic.

Example (I hope I got it right -- please correct me if it's wrong)

A B C D
E F G H

could be copied to:

A C H
B E F
D G

If the consumer was at offset 1 and 2 in the first topic how would the
mapping be computed? We need to enures that B C D as well as G H are
read after the switch. Thus, offset would need to be 1 0 0. I am not
sure how this would be computed?

Furthermore, I want to point out that the new offsets would imply that E
is consumed a second time by the consumer. E and F were consumed
originally, but E is copied after B that was not yet consumed.

Or is there a way that we can ensure that this "flip" does never happen
while we copy the data?


-Matthias



On 3/8/18 10:32 PM, Matthias J. Sax wrote:
> As I just mentioned joins:
> 
> For Kafka Streams it might also be required to change the partition
> count for multiple topics in a coordinated way that allows to maintain
> the co-partitioning property that Kafka Streams uses to computed joins.
> 
> Any thoughts how this could be handled?
> 
> 
> -Matthias
> 
> On 3/8/18 10:08 PM, Matthias J. Sax wrote:
>> Jun,
>>
>> There is one more case: non-windowed aggregations. For windowed
>> aggregation, the changelog topic will be compact+delete. However, for
>> non-windowed aggregation the policy is compact only.
>>
>> Even if we assume that windowed aggregations are dominant and
>> non-windowed aggregation are used rarely, it seems to be bad to not
>> support the feature is a non-windowed aggregation is used. Also,
>> non-windowed aggregation volume depends on input-stream volume that
>> might be high.
>>
>> Furthermore, we support stream-table join and this requires that the
>> stream and the table are co-partitioned. Thus, even if the table would
>> have lower volume but the stream must be scaled out, we also need to
>> scale out the table to preserve co-partitioning.
>>
>>
>> -Matthias
>>
>> On 3/8/18 6:44 PM, Jun Rao wrote:
>>> Hi, Matthis,
>>>
>>> My understanding is that in KStream, the only case when a changelog topic
>>> needs to be compacted is when the corresponding input is a KTable. In all
>>> other cases, the changelog topics are of compacted + deletion. So, if most
>>> KTables are not high volume, there may not be a need to expand its
>>> partitions and therefore the partitions of the corresponding changelog
>>> topic.
>>>
>>> Thanks,
>>>
>>> Jun
>>>
>>> On Wed, Mar 7, 2018 at 2:34 PM, Matthias J. Sax 
>>> wrote:
>>>
 Jun,

 thanks for your comment. This should actually work for Streams, because
 we don't rely on producer "hashing" but specify the partition number
 explicitly on send().

 About not allowing to change the number of partition for changelog
 topics: for Streams, this seems to imply that we need to create a second
 changelog topic for each store with the new partition count. However, it
 would be unclear when/if we can delete the old topic. Thus, it moves the
 "problem" into the application layer. It's hard to judge for me atm what
 the impact would be, but it's something we should pay attention to.


 -Matthias

 On 3/6/18 3:45 PM, Jun Rao wrote:
> Hi, Mattias,
>
> Regarding your comment "If it would be time-delay based, it might be
> problematic
> for Kafka Streams: if we get the information that the new input
 partitions
> are available for producing, we need to enable the new changelog
 partitions
> for producing, too. If those would not be available yet, because the
> time-delay did not trigger yet, it would be problematic to avoid
> crashing.", could you just enable the changelog topic to write to its new
> partitions immediately?  The input topic can be configured with a delay
 in
> writing to the new partitions. Initially, there won't be new data
 produced
> into the newly added partitions in the input topic. However, we could
> prebuild the state for the new input partition and write the state
 changes
> to the corresponding new partitions in the changelog topic.
>
> Hi, Jan,
>
> For a compacted topic, garbage collecting the old keys in the existing
> partitions after partition expansion can be tricky as your pointed out. A
> few options here. (a) Let brokers exchange keys across brokers during
> compaction. This will add complexity on the broker side. (b) Build an
> external tool that scans the compacted topic and drop the prefix of a
> 

Re: [DISCUSS] KIP-258: Allow to Store Record Timestamps in RocksDB

2018-03-08 Thread Ted Yu
Matthias:
Nicely written KIP.

"in_place" : can this be "in-place" ? Underscore may sometimes be miss
typed (as '-'). I think using '-' is more friendly to user.

public interface ReadOnlyKeyValueTimestampStore {

Is ReadOnlyKeyValueStoreWithTimestamp better name for the class ?

Thanks

On Thu, Mar 8, 2018 at 1:29 PM, Guozhang Wang  wrote:

> Hello Matthias, thanks for the KIP.
>
> I've read through the upgrade patch section and it looks good to me, if you
> already have a WIP PR for it could you also share it here so that people
> can take a look?
>
> I'm +1 on the KIP itself. But large KIPs like this there are always some
> devil hidden in the details, so I think it is better to have the
> implementation in parallel along with the design discussion :)
>
>
> Guozhang
>
>
> On Wed, Mar 7, 2018 at 2:12 PM, Matthias J. Sax 
> wrote:
>
> > Hi,
> >
> > I want to propose KIP-258 for the Streams API to allow storing
> > timestamps in RocksDB. This feature is the basis to resolve multiple
> > tickets (issues and feature requests).
> >
> > Looking forward to your comments about this!
> >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
> >
> >
> > -Matthias
> >
> >
> >
>
>
> --
> -- Guozhang
>


Jenkins build is back to normal : kafka-trunk-jdk8 #2459

2018-03-08 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-267: Add Processor Unit Test Support to Kafka Streams Test Utils

2018-03-08 Thread John Roesler
Actually, replacing the MockProcessorContext in o.a.k.test could be a bit
tricky, since it would make the "streams" module depend on
"streams:test-utils", but "streams:test-utils" already depends on "streams".

At first glance, it seems like the options are:
1. leave the two separate implementations in place. This shouldn't be
underestimated, especially since our internal tests may need different
things from a mocked P.C. than our API users.
2. move the public testing artifacts into the regular streams module
3. move the unit tests for Streams into a third module that depends on both
streams and test-utils. Yuck!

Thanks,
-John

On Thu, Mar 8, 2018 at 3:16 PM, John Roesler  wrote:

> Thanks for the review, Guozhang,
>
> In response:
> 1. I missed that! I'll look into it and update the KIP.
>
> 2. I was planning to use the real implementation, since folks might
> register some metrics in the processors and want to verify the values that
> get recorded. If the concern is about initializing all the stuff that's in
> the Metrics object, I can instantiate it lazily or even make it optional by
> taking a nullable constructor parameter.
>
> 3. Agreed. I think that's the real sharp edge here. I actually think it
> would be neat to auto-trigger those scheduled punctuators, but it seems
> like that moves this component out of "mock" territory and into "driver"
> territory. Since we already have the TopologyTestDriver, I'd prefer to
> focus on keeping the mock lean. I agree it should be in the javadoc as well
> as the web documentation.
>
> Thanks,
> -John
>
> On Thu, Mar 8, 2018 at 1:46 PM, Guozhang Wang  wrote:
>
>> Hello John,
>>
>> Thanks for the KIP. I made a pass over the wiki page and here are some
>> comments:
>>
>> 1. Meta-comment: there is an internal class MockProcessorContext under the
>> o.a.k.test package, which should be replaced as part of this KIP.
>>
>> 2. In @Override StreamsMetrics metrics(), will you return a fully created
>> StreamsMetricsImpl object or are you planning to use the
>> MockStreamsMetrics? Note that for the latter case you probably need to
>> look
>> into https://issues.apache.org/jira/browse/KAFKA-5676 as well.
>>
>> 3. Not related to the KIP changes themselves: about
>> "context.scheduledPunctuators": we need to well document that in the
>> MockProcessorContext the scheduled punctuator will never by
>> auto-triggered,
>> and hence it is only for testing people's code that some punctuators are
>> indeed registered, and if people want full auto punctuation testing they
>> have to go with TopologyTestDriver.
>>
>>
>>
>> Guozhang
>>
>>
>> On Wed, Mar 7, 2018 at 8:04 PM, John Roesler  wrote:
>>
>> > On Wed, Mar 7, 2018 at 8:03 PM, John Roesler  wrote:
>> >
>> > > Thanks Ted,
>> > >
>> > > Sure thing; I updated the example code in the KIP with a little
>> snippet.
>> > >
>> > > -John
>> > >
>> > > On Wed, Mar 7, 2018 at 7:18 PM, Ted Yu  wrote:
>> > >
>> > >> Looks good.
>> > >>
>> > >> See if you can add punctuator into the sample code.
>> > >>
>> > >> On Wed, Mar 7, 2018 at 7:10 PM, John Roesler 
>> wrote:
>> > >>
>> > >> > Dear Kafka community,
>> > >> >
>> > >> > I am proposing KIP-267 to augment the public Streams test utils
>> API.
>> > >> > The goal is to simplify testing of Kafka Streams applications.
>> > >> >
>> > >> > Please find details in the
>> > >> > wiki:https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > >> > 267%3A+Add+Processor+Unit+Test+Support+to+Kafka+Streams+Test+Utils
>> > >> >
>> > >> > An initial WIP PR can be found here:https://github.com/
>> > >> > apache/kafka/pull/4662
>> > >> >
>> > >> > I also included the user-list (please hit "reply-all" to include
>> both
>> > >> > lists in this KIP discussion).
>> > >> >
>> > >> > Thanks,
>> > >> >
>> > >> > -John
>> > >> >
>> > >>
>> > >
>> > >
>> >
>>
>>
>>
>> --
>> -- Guozhang
>>
>
>


Jenkins build is back to normal : kafka-trunk-jdk7 #3234

2018-03-08 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-258: Allow to Store Record Timestamps in RocksDB

2018-03-08 Thread Ted Yu
Matthias:
For my point #1, I don't have preference as to which separator is chosen.
Given the background you mentioned, current choice is good.

For #2, I think my proposal is better since it is closer to English grammar.

Would be good to listen to what other people think.

On Thu, Mar 8, 2018 at 4:02 PM, Matthias J. Sax 
wrote:

> Thanks for the comments!
>
> @Guozhang:
>
> So far, there is one PR for the rebalance metadata upgrade fix
> (addressing the mentioned
> https://issues.apache.org/jira/browse/KAFKA-6054) It give a first
> impression how the metadata upgrade works including a system test:
> https://github.com/apache/kafka/pull/4636
>
> I can share other PRs as soon as they are ready. I agree that the KIP is
> complex am I ok with putting out more code to give better discussion
> context.
>
> @Ted:
>
> I picked `_` instead of `-` to align with the `processing.guarantee`
> parameter that accepts `at_least_one` and `exactly_once` as values.
> Personally, I don't care about underscore vs dash but I prefer
> consistency. If you feel strong about it, we can also change it to `-`.
>
> About the interface name: I am fine either way -- I stripped the `With`
> to keep the name a little shorter. Would be good to get feedback from
> others and pick the name the majority prefers.
>
> @John:
>
> We can certainly change it. I agree that it would not make a difference.
> I'll dig into the code to see if any of the two version might introduce
> undesired complexity and update the KIP if I don't hit an issue with
> putting the `-v2` to the store directory instead of `rocksdb-v2`
>
>
> -Matthias
>
>
> On 3/8/18 2:44 PM, John Roesler wrote:
> > Hey Matthias,
> >
> > The KIP looks good to me. I had several questions queued up, but they
> were
> > all in the "rejected alternatives" section... oh, well.
> >
> > One very minor thought re changing the state directory from
> "//<
> > application.id>//rocksdb/storeName/" to "//<
> > application.id>//rocksdb-v2/storeName/": if you put the "v2"
> > marker on the storeName part of the path (i.e., "//<
> > application.id>//rocksdb/storeName-v2/"), then you get the same
> > benefits without altering the high-level directory structure.
> >
> > It may not matter, but I could imagine people running scripts to monitor
> > rocksdb disk usage for each task, or other such use cases.
> >
> > Thanks,
> > -John
> >
> > On Thu, Mar 8, 2018 at 2:02 PM, Ted Yu  wrote:
> >
> >> Matthias:
> >> Nicely written KIP.
> >>
> >> "in_place" : can this be "in-place" ? Underscore may sometimes be miss
> >> typed (as '-'). I think using '-' is more friendly to user.
> >>
> >> public interface ReadOnlyKeyValueTimestampStore {
> >>
> >> Is ReadOnlyKeyValueStoreWithTimestamp better name for the class ?
> >>
> >> Thanks
> >>
> >> On Thu, Mar 8, 2018 at 1:29 PM, Guozhang Wang 
> wrote:
> >>
> >>> Hello Matthias, thanks for the KIP.
> >>>
> >>> I've read through the upgrade patch section and it looks good to me, if
> >> you
> >>> already have a WIP PR for it could you also share it here so that
> people
> >>> can take a look?
> >>>
> >>> I'm +1 on the KIP itself. But large KIPs like this there are always
> some
> >>> devil hidden in the details, so I think it is better to have the
> >>> implementation in parallel along with the design discussion :)
> >>>
> >>>
> >>> Guozhang
> >>>
> >>>
> >>> On Wed, Mar 7, 2018 at 2:12 PM, Matthias J. Sax  >
> >>> wrote:
> >>>
>  Hi,
> 
>  I want to propose KIP-258 for the Streams API to allow storing
>  timestamps in RocksDB. This feature is the basis to resolve multiple
>  tickets (issues and feature requests).
> 
>  Looking forward to your comments about this!
> 
>  https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>  258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB
> 
> 
>  -Matthias
> 
> 
> 
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
> >
>
>


Re: [DISCUSS] KIP-253: Support in-order message delivery with partition expansion

2018-03-08 Thread Jun Rao
Hi, Matthis,

My understanding is that in KStream, the only case when a changelog topic
needs to be compacted is when the corresponding input is a KTable. In all
other cases, the changelog topics are of compacted + deletion. So, if most
KTables are not high volume, there may not be a need to expand its
partitions and therefore the partitions of the corresponding changelog
topic.

Thanks,

Jun

On Wed, Mar 7, 2018 at 2:34 PM, Matthias J. Sax 
wrote:

> Jun,
>
> thanks for your comment. This should actually work for Streams, because
> we don't rely on producer "hashing" but specify the partition number
> explicitly on send().
>
> About not allowing to change the number of partition for changelog
> topics: for Streams, this seems to imply that we need to create a second
> changelog topic for each store with the new partition count. However, it
> would be unclear when/if we can delete the old topic. Thus, it moves the
> "problem" into the application layer. It's hard to judge for me atm what
> the impact would be, but it's something we should pay attention to.
>
>
> -Matthias
>
> On 3/6/18 3:45 PM, Jun Rao wrote:
> > Hi, Mattias,
> >
> > Regarding your comment "If it would be time-delay based, it might be
> > problematic
> > for Kafka Streams: if we get the information that the new input
> partitions
> > are available for producing, we need to enable the new changelog
> partitions
> > for producing, too. If those would not be available yet, because the
> > time-delay did not trigger yet, it would be problematic to avoid
> > crashing.", could you just enable the changelog topic to write to its new
> > partitions immediately?  The input topic can be configured with a delay
> in
> > writing to the new partitions. Initially, there won't be new data
> produced
> > into the newly added partitions in the input topic. However, we could
> > prebuild the state for the new input partition and write the state
> changes
> > to the corresponding new partitions in the changelog topic.
> >
> > Hi, Jan,
> >
> > For a compacted topic, garbage collecting the old keys in the existing
> > partitions after partition expansion can be tricky as your pointed out. A
> > few options here. (a) Let brokers exchange keys across brokers during
> > compaction. This will add complexity on the broker side. (b) Build an
> > external tool that scans the compacted topic and drop the prefix of a
> > partition if all records in the prefix are removable. The admin can then
> > run this tool when the unneeded space needs to be reclaimed. (c) Don't
> > support partition change in a compacted topic. This might be ok since
> most
> > compacted topics are not high volume.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Mar 6, 2018 at 10:38 AM, Dong Lin  wrote:
> >
> >> Hi everyone,
> >>
> >> Thanks for all the comments! It appears that everyone prefers linear
> >> hashing because it reduces the amount of state that needs to be moved
> >> between consumers (for stream processing). The KIP has been updated to
> use
> >> linear hashing.
> >>
> >> Regarding the migration endeavor: it seems that migrating producer
> library
> >> to use linear hashing should be pretty straightforward without
> >> much operational endeavor. If we don't upgrade client library to use
> this
> >> KIP, we can not support in-order delivery after partition is changed
> >> anyway. Suppose we upgrade client library to use this KIP, if partition
> >> number is not changed, the key -> partition mapping will be exactly the
> >> same as it is now because it is still determined using murmur_hash(key)
> %
> >> original_partition_num. In other words, this change is backward
> compatible.
> >>
> >> Regarding the load distribution: if we use linear hashing, the load may
> be
> >> unevenly distributed because those partitions which are not split may
> >> receive twice as much traffic as other partitions that are split. This
> >> issue can be mitigated by creating topic with partitions that are
> several
> >> times the number of consumers. And there will be no imbalance if the
> >> partition number is always doubled. So this imbalance seems acceptable.
> >>
> >> Regarding storing the partition strategy as per-topic config: It seems
> not
> >> necessary since we can still use murmur_hash as the default hash
> function
> >> and additionally apply the linear hashing algorithm if the partition
> number
> >> has increased. Not sure if there is any use-case for producer to use a
> >> different hash function. Jason, can you check if there is some use-case
> >> that I missed for using the per-topic partition strategy?
> >>
> >> Regarding how to reduce latency (due to state store/load) in stream
> >> processing consumer when partition number changes: I need to read the
> Kafka
> >> Stream code to understand how Kafka Stream currently migrate state
> between
> >> consumers when the application is added/removed for a given job. I will
> >> reply after I finish 

Re: [DISCUSS] KIP-267: Add Processor Unit Test Support to Kafka Streams Test Utils

2018-03-08 Thread John Roesler
Thanks for the review, Guozhang,

In response:
1. I missed that! I'll look into it and update the KIP.

2. I was planning to use the real implementation, since folks might
register some metrics in the processors and want to verify the values that
get recorded. If the concern is about initializing all the stuff that's in
the Metrics object, I can instantiate it lazily or even make it optional by
taking a nullable constructor parameter.

3. Agreed. I think that's the real sharp edge here. I actually think it
would be neat to auto-trigger those scheduled punctuators, but it seems
like that moves this component out of "mock" territory and into "driver"
territory. Since we already have the TopologyTestDriver, I'd prefer to
focus on keeping the mock lean. I agree it should be in the javadoc as well
as the web documentation.

Thanks,
-John

On Thu, Mar 8, 2018 at 1:46 PM, Guozhang Wang  wrote:

> Hello John,
>
> Thanks for the KIP. I made a pass over the wiki page and here are some
> comments:
>
> 1. Meta-comment: there is an internal class MockProcessorContext under the
> o.a.k.test package, which should be replaced as part of this KIP.
>
> 2. In @Override StreamsMetrics metrics(), will you return a fully created
> StreamsMetricsImpl object or are you planning to use the
> MockStreamsMetrics? Note that for the latter case you probably need to look
> into https://issues.apache.org/jira/browse/KAFKA-5676 as well.
>
> 3. Not related to the KIP changes themselves: about
> "context.scheduledPunctuators": we need to well document that in the
> MockProcessorContext the scheduled punctuator will never by auto-triggered,
> and hence it is only for testing people's code that some punctuators are
> indeed registered, and if people want full auto punctuation testing they
> have to go with TopologyTestDriver.
>
>
>
> Guozhang
>
>
> On Wed, Mar 7, 2018 at 8:04 PM, John Roesler  wrote:
>
> > On Wed, Mar 7, 2018 at 8:03 PM, John Roesler  wrote:
> >
> > > Thanks Ted,
> > >
> > > Sure thing; I updated the example code in the KIP with a little
> snippet.
> > >
> > > -John
> > >
> > > On Wed, Mar 7, 2018 at 7:18 PM, Ted Yu  wrote:
> > >
> > >> Looks good.
> > >>
> > >> See if you can add punctuator into the sample code.
> > >>
> > >> On Wed, Mar 7, 2018 at 7:10 PM, John Roesler 
> wrote:
> > >>
> > >> > Dear Kafka community,
> > >> >
> > >> > I am proposing KIP-267 to augment the public Streams test utils API.
> > >> > The goal is to simplify testing of Kafka Streams applications.
> > >> >
> > >> > Please find details in the
> > >> > wiki:https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > >> > 267%3A+Add+Processor+Unit+Test+Support+to+Kafka+Streams+Test+Utils
> > >> >
> > >> > An initial WIP PR can be found here:https://github.com/
> > >> > apache/kafka/pull/4662
> > >> >
> > >> > I also included the user-list (please hit "reply-all" to include
> both
> > >> > lists in this KIP discussion).
> > >> >
> > >> > Thanks,
> > >> >
> > >> > -John
> > >> >
> > >>
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>


Re: [DISCUSS] KIP-253: Support in-order message delivery with partition expansion

2018-03-08 Thread Jun Rao
Hi, Jan,

Thanks for the feedback. Just some comments on the earlier points that you
mentioned.

50. You brought up the question of whether existing data needs to be copied
during partition expansion. My understand of your view is that avoid
copying existing data will be more efficient, but it doesn't work well with
compacted topics since some keys in the original partitions will never be
cleaned. It would be useful to understand your use case of compacted topics
a bit more. In the common use case, the data volume in a compacted topic
may not be large. So, I am not sure if there is a strong need to expand
partitions in a compacted topic, at least initially.

51. "Growing the topic by an integer factor does not require any state
redistribution at all." Could you clarify this a bit more? Let's say you
have a consumer app that computes the windowed count per key. If you double
the number of partitions from 1 to 2 and grow the consumer instances from 1
to 2, we would need to redistribute some of the counts to the new consumer
instance. Regarding to linear hashing, it's true that it won't solve the
problem with compacted topics. The main benefit is that it redistributes
the keys in one partition to no more than two partitions, which could help
redistribute the state.

52. Good point on coordinating the expansion of 2 topics that need to be
joined together. This is where the 2-phase partition expansion could
potentially help. In the first phase, we could add new partitions to the 2
topics one at a time but without publishing to the new patitions. Then, we
can add new consumer instances to pick up the new partitions. In this
transition phase, no reshuffling is needed since no data is coming from the
new partitions. Finally, we can enable the publishing to the new
partitions.

53. "Migrating consumer is a step that might be made completly unnecessary
if - for example streams - takes the gcd as partitioning scheme instead of
enforcing 1 to 1." Not sure that I fully understand this. I think you mean
that a consumer application can run more instances than the number of
partitions. In that case, the consumer can just repartitioning the input
data according to the number of instances. This is possible, but just has
the overhead of reshuffling the data.

54. "The other thing I wanted to mention is that I believe the current
suggestion (without copying data over) can be implemented in pure userland
with a custom partitioner and a small feedbackloop from ProduceResponse =>
Partitionier in coorporation with a change management system." I am not
sure a customized partitioner itself solves the problem. We probably need
some broker side support to enforce when the new partitions can be used. We
also need some support on the consumer/kstream side to preserve the per key
ordering and potentially migrate the processing state. This is not trivial
and I am not sure if it's ideal to fully push to the application space.

Jun


On Tue, Mar 6, 2018 at 10:33 PM, Jan Filipiak 
wrote:

> Hi Dong,
>
> are you actually reading my emails, or are you just using the thread I
> started for general announcements regarding the KIP?
>
> I tried to argue really hard against linear hashing. Growing the topic by
> an integer factor does not require any state redistribution at all. I fail
> to see completely where linear hashing helps on log compacted topics.
>
> If you are not willing to explain to me what I might be overlooking: that
> is fine.
> But I ask you to not reply to my emails then. Please understand my
> frustration with this.
>
> Best Jan
>
>
>
> On 06.03.2018 19:38, Dong Lin wrote:
>
>> Hi everyone,
>>
>> Thanks for all the comments! It appears that everyone prefers linear
>> hashing because it reduces the amount of state that needs to be moved
>> between consumers (for stream processing). The KIP has been updated to use
>> linear hashing.
>>
>> Regarding the migration endeavor: it seems that migrating producer library
>> to use linear hashing should be pretty straightforward without
>> much operational endeavor. If we don't upgrade client library to use this
>> KIP, we can not support in-order delivery after partition is changed
>> anyway. Suppose we upgrade client library to use this KIP, if partition
>> number is not changed, the key -> partition mapping will be exactly the
>> same as it is now because it is still determined using murmur_hash(key) %
>> original_partition_num. In other words, this change is backward
>> compatible.
>>
>> Regarding the load distribution: if we use linear hashing, the load may be
>> unevenly distributed because those partitions which are not split may
>> receive twice as much traffic as other partitions that are split. This
>> issue can be mitigated by creating topic with partitions that are several
>> times the number of consumers. And there will be no imbalance if the
>> partition number is always doubled. So this imbalance seems acceptable.
>>
>> Regarding storing the 

Jenkins build is back to normal : kafka-trunk-jdk9 #455

2018-03-08 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-258: Allow to Store Record Timestamps in RocksDB

2018-03-08 Thread Matthias J. Sax
Thanks for the comments!

@Guozhang:

So far, there is one PR for the rebalance metadata upgrade fix
(addressing the mentioned
https://issues.apache.org/jira/browse/KAFKA-6054) It give a first
impression how the metadata upgrade works including a system test:
https://github.com/apache/kafka/pull/4636

I can share other PRs as soon as they are ready. I agree that the KIP is
complex am I ok with putting out more code to give better discussion
context.

@Ted:

I picked `_` instead of `-` to align with the `processing.guarantee`
parameter that accepts `at_least_one` and `exactly_once` as values.
Personally, I don't care about underscore vs dash but I prefer
consistency. If you feel strong about it, we can also change it to `-`.

About the interface name: I am fine either way -- I stripped the `With`
to keep the name a little shorter. Would be good to get feedback from
others and pick the name the majority prefers.

@John:

We can certainly change it. I agree that it would not make a difference.
I'll dig into the code to see if any of the two version might introduce
undesired complexity and update the KIP if I don't hit an issue with
putting the `-v2` to the store directory instead of `rocksdb-v2`


-Matthias


On 3/8/18 2:44 PM, John Roesler wrote:
> Hey Matthias,
> 
> The KIP looks good to me. I had several questions queued up, but they were
> all in the "rejected alternatives" section... oh, well.
> 
> One very minor thought re changing the state directory from "//<
> application.id>//rocksdb/storeName/" to "//<
> application.id>//rocksdb-v2/storeName/": if you put the "v2"
> marker on the storeName part of the path (i.e., "//<
> application.id>//rocksdb/storeName-v2/"), then you get the same
> benefits without altering the high-level directory structure.
> 
> It may not matter, but I could imagine people running scripts to monitor
> rocksdb disk usage for each task, or other such use cases.
> 
> Thanks,
> -John
> 
> On Thu, Mar 8, 2018 at 2:02 PM, Ted Yu  wrote:
> 
>> Matthias:
>> Nicely written KIP.
>>
>> "in_place" : can this be "in-place" ? Underscore may sometimes be miss
>> typed (as '-'). I think using '-' is more friendly to user.
>>
>> public interface ReadOnlyKeyValueTimestampStore {
>>
>> Is ReadOnlyKeyValueStoreWithTimestamp better name for the class ?
>>
>> Thanks
>>
>> On Thu, Mar 8, 2018 at 1:29 PM, Guozhang Wang  wrote:
>>
>>> Hello Matthias, thanks for the KIP.
>>>
>>> I've read through the upgrade patch section and it looks good to me, if
>> you
>>> already have a WIP PR for it could you also share it here so that people
>>> can take a look?
>>>
>>> I'm +1 on the KIP itself. But large KIPs like this there are always some
>>> devil hidden in the details, so I think it is better to have the
>>> implementation in parallel along with the design discussion :)
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Wed, Mar 7, 2018 at 2:12 PM, Matthias J. Sax 
>>> wrote:
>>>
 Hi,

 I want to propose KIP-258 for the Streams API to allow storing
 timestamps in RocksDB. This feature is the basis to resolve multiple
 tickets (issues and feature requests).

 Looking forward to your comments about this!

 https://cwiki.apache.org/confluence/display/KAFKA/KIP-
 258%3A+Allow+to+Store+Record+Timestamps+in+RocksDB


 -Matthias



>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
> 



signature.asc
Description: OpenPGP digital signature