Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-11-24 Thread Dong Lin
Hey Colin,

Thanks for the reply! Please see my comment inline.

On Fri, Nov 24, 2017 at 9:39 PM, Colin McCabe  wrote:

> On Thu, Nov 23, 2017, at 18:35, Dong Lin wrote:
> > Hey Colin,
> >
> > Thanks for the KIP! This is definitely useful when there are many idle
> > partitions in the clusters.
> >
> > Just in case it is useful, I will provide some number here. We observe
> > that for a clsuter that have around 2.5k partitions per broker, the
> > ProduceRequestTotal time average value is around 25 ms. For a cluster
> > with 2.5k partitions per broker whose AllTopicsBytesInRate is only
> around 6
> > MB/s, the ProduceRequestTotalTime average value is around 180 ms, most of
> > which is spent on ProduceRequestRemoteTime. The increased
> > ProduceRequestTotalTime significantly reduces throughput of producers
> > with ack=all. I think this KIP can help address this problem.
>
> Hi Dong,
>
> Thanks for the numbers.  It's good to have empirical confirmation that
> this will help!
>
> >
> > Here are some of my ideas on the current KIP:
> >
> > - The KIP says that the follower will include a partition in
> > the IncrementalFetchRequest if the LEO of the partition has been updated.
> > It seems that doing so may prevent leader from knowing information (e.g.
> > LogStartOffset) of the follower that will otherwise be included in the
> > FetchRequest. Maybe we should have a paragraph to explicitly define the
> > full criteria of when the fetcher should include a partition in the
> > FetchResponse and probably include logStartOffset as part of the
> > criteria?
>
> Hmm.  That's a good point... we should think about whether we need to
> send partition information in an incremental update when the LSO
> changes.
>
> Sorry if this is a dumb question, but what does the leader do with the
> logStartOffset of the followers?  When does the leader need to know it?
> Also, how often do we expect it to be changed by the LogCleaner?
>

The leader uses logStartOffset of the followers to determine the
logStartOffset of the partition. It is needed to handle
DeleteRecordsRequest. It can be changed if the log is deleted on the
follower due to log retention.


>
> > - It seems that every time the set of partitions in the
> > ReplicaFetcherThread is changed, or if follower restarts, a new UUID will
> > be generated in the leader and leader will add a new entry in the
> > in-memory  map to map the UUID to list of partitions (and other metadata
> such as
> > fetch offset). This map with grow over time depending depending on the
> > frequency of events such as partition movement or broker restart. As you
> mentioned,
> > we probably need to timeout entries in this map. But there is also
> > tradeoff  in this timeout -- large timeout increase memory usage whereas
> smaller
> > timeout increases frequency of the full FetchRequest. Could you specify
> > the default value of this timeout and probably also explain how it
> affects
> > the performance of this KIP?
>
> Right, there are definitely some tradeoffs here.
>
> Since fetches happen very frequently, I think even a short UUID cache
> expiration time of a minute or two should already be enough to ensure
> that 99%+ of all fetch requests are incremental fetch requests.  I think
> the idea of partitioning the cache per broker is a good one which will
> let us limit memory consumption even more.
>
> If replica fetcher threads do change their partition assignments often,
> we could also add a special "old UUID to uncache" field to the
> FetchRequest as well.  That would avoid having to wait for the full
> minute to clear the UUID cache.  That's probably not  necessary,
> though...
>

I think expiration time of a minute is two is probably reasonable. Yeah we
can discuss it further after the KIP is updated. Thanks!


>
> > Also, do you think we can avoid having duplicate
> > entries from the same ReplicaFetcher (in case of partition set change) by
> > using brokerId+fetcherThreadIndex as the UUID?
>
> My concern about that is that if two messages get reordered somehow, or
> an update gets lost, the view of partitions which the fetcher thread has
> could diverge from the view which the leader has.  Also, UUIDs work for
> consumers, but clearly consumers cannot use a
> brokerID+fetcherThreadIndex.  It's simpler to have one system than two.
>

Yeah this can be a problem if two messages are lost of reordered somehow. I
am just wondering whether there actually exists a scenario where the
message can be ordered between ReplicaFetcherThread and the leader. My gut
feel is that since the ReplicaFetcherThread talks to leader using a single
TCP connection with inflight requests = 1, out-of-order delivery probably
should not happen. I may be wrong though. What do you think?


>
> >
> > I agree with the previous comments that 1) ideally we want to evolve the
> > existing existing FetchRequest instead of adding a new request type; and
> > 2) KIP hopefully can also apply to replication 

Re: [DISCUSS] KIP-227: Introduce Incremental FetchRequests to Increase Partition Scalability

2017-11-24 Thread Colin McCabe
On Thu, Nov 23, 2017, at 18:35, Dong Lin wrote:
> Hey Colin,
> 
> Thanks for the KIP! This is definitely useful when there are many idle
> partitions in the clusters.
> 
> Just in case it is useful, I will provide some number here. We observe
> that for a clsuter that have around 2.5k partitions per broker, the
> ProduceRequestTotal time average value is around 25 ms. For a cluster
> with 2.5k partitions per broker whose AllTopicsBytesInRate is only around 6
> MB/s, the ProduceRequestTotalTime average value is around 180 ms, most of
> which is spent on ProduceRequestRemoteTime. The increased
> ProduceRequestTotalTime significantly reduces throughput of producers
> with ack=all. I think this KIP can help address this problem.

Hi Dong,

Thanks for the numbers.  It's good to have empirical confirmation that
this will help!

> 
> Here are some of my ideas on the current KIP:
> 
> - The KIP says that the follower will include a partition in
> the IncrementalFetchRequest if the LEO of the partition has been updated.
> It seems that doing so may prevent leader from knowing information (e.g.
> LogStartOffset) of the follower that will otherwise be included in the
> FetchRequest. Maybe we should have a paragraph to explicitly define the
> full criteria of when the fetcher should include a partition in the
> FetchResponse and probably include logStartOffset as part of the
> criteria?

Hmm.  That's a good point... we should think about whether we need to
send partition information in an incremental update when the LSO
changes.

Sorry if this is a dumb question, but what does the leader do with the
logStartOffset of the followers?  When does the leader need to know it? 
Also, how often do we expect it to be changed by the LogCleaner?

> - It seems that every time the set of partitions in the
> ReplicaFetcherThread is changed, or if follower restarts, a new UUID will
> be generated in the leader and leader will add a new entry in the
> in-memory  map to map the UUID to list of partitions (and other metadata such 
> as
> fetch offset). This map with grow over time depending depending on the
> frequency of events such as partition movement or broker restart. As you 
> mentioned,
> we probably need to timeout entries in this map. But there is also
> tradeoff  in this timeout -- large timeout increase memory usage whereas 
> smaller
> timeout increases frequency of the full FetchRequest. Could you specify
> the default value of this timeout and probably also explain how it affects
> the performance of this KIP?

Right, there are definitely some tradeoffs here.

Since fetches happen very frequently, I think even a short UUID cache
expiration time of a minute or two should already be enough to ensure
that 99%+ of all fetch requests are incremental fetch requests.  I think
the idea of partitioning the cache per broker is a good one which will
let us limit memory consumption even more.

If replica fetcher threads do change their partition assignments often,
we could also add a special "old UUID to uncache" field to the
FetchRequest as well.  That would avoid having to wait for the full
minute to clear the UUID cache.  That's probably not  necessary,
though...

> Also, do you think we can avoid having duplicate
> entries from the same ReplicaFetcher (in case of partition set change) by
> using brokerId+fetcherThreadIndex as the UUID?

My concern about that is that if two messages get reordered somehow, or
an update gets lost, the view of partitions which the fetcher thread has
could diverge from the view which the leader has.  Also, UUIDs work for
consumers, but clearly consumers cannot use a 
brokerID+fetcherThreadIndex.  It's simpler to have one system than two.

> 
> I agree with the previous comments that 1) ideally we want to evolve the
> existing existing FetchRequest instead of adding a new request type; and
> 2) KIP hopefully can also apply to replication service such as e.g.
> MirrorMaker. In addition, ideally we probably want to implement the new
> logic in a separate class without having to modify the existing class
> (e.g. Log, LogManager) so that the implementation and design can be simpler
> going forward. Motivated by these concepts, I am wondering if the following
> alternative design may be worth thinking.
> 
> Here are the details of a potentially feasible alternative approach.
> 
> *Protocol change: *
> 
> - We add a fetcherId of string type in the FetchRequest. This fetcherId
> is similarly to UUID and helps leader correlate the fetcher (i.e.
> ReplicaFetcherThread or MM consumer) with the state of the fetcher. This
> fetcherId is determined by the fetcher. For most consumers this fetcherId
> is null. For ReplicaFetcherThread this fetcherId = brokerId +
> threadIndex.
> For MM this is groupId+someIndex.

As Jay pointed out earlier, there are other consumers besides
MirrorMaker that might want to take advantage of incremental fetch
requests.  He gave the example of the HDFS connector, but there are many

Jenkins build is back to normal : kafka-trunk-jdk8 #2239

2017-11-24 Thread Apache Jenkins Server
See 




Jenkins build is back to normal : kafka-trunk-jdk9 #221

2017-11-24 Thread Apache Jenkins Server
See 




[GitHub] kafka pull request #4250: KAFKA-6261: Fix exception thrown by request loggin...

2017-11-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4250


---


Build failed in Jenkins: kafka-trunk-jdk9 #220

2017-11-24 Thread Apache Jenkins Server
See 


Changes:

[ismael] MINOR: Temporarily disable testLogStartOffsetCheckpoint

--
[...truncated 1.44 MB...]
kafka.zk.ZKPathTest > testMakeSurePersistsPathExists STARTED

kafka.zk.ZKPathTest > testMakeSurePersistsPathExists PASSED

kafka.zk.KafkaZkClientTest > testSetGetAndDeletePartitionReassignment STARTED

kafka.zk.KafkaZkClientTest > testSetGetAndDeletePartitionReassignment PASSED

kafka.zk.KafkaZkClientTest > testGetDataAndVersion STARTED

kafka.zk.KafkaZkClientTest > testGetDataAndVersion PASSED

kafka.zk.KafkaZkClientTest > testGetChildren STARTED

kafka.zk.KafkaZkClientTest > testGetChildren PASSED

kafka.zk.KafkaZkClientTest > testSetAndGetConsumerOffset STARTED

kafka.zk.KafkaZkClientTest > testSetAndGetConsumerOffset PASSED

kafka.zk.KafkaZkClientTest > testEntityConfigManagementMethods STARTED

kafka.zk.KafkaZkClientTest > testEntityConfigManagementMethods PASSED

kafka.zk.KafkaZkClientTest > testCreateRecursive STARTED

kafka.zk.KafkaZkClientTest > testCreateRecursive PASSED

kafka.zk.KafkaZkClientTest > testGetConsumerOffsetNoData STARTED

kafka.zk.KafkaZkClientTest > testGetConsumerOffsetNoData PASSED

kafka.zk.KafkaZkClientTest > testDeleteTopicPathMethods STARTED

kafka.zk.KafkaZkClientTest > testDeleteTopicPathMethods PASSED

kafka.zk.KafkaZkClientTest > testAclManagementMethods STARTED

kafka.zk.KafkaZkClientTest > testAclManagementMethods PASSED

kafka.zk.KafkaZkClientTest > testGetDataAndStat STARTED

kafka.zk.KafkaZkClientTest > testGetDataAndStat PASSED

kafka.zk.KafkaZkClientTest > testConditionalUpdatePath STARTED

kafka.zk.KafkaZkClientTest > testConditionalUpdatePath PASSED

kafka.zk.KafkaZkClientTest > testTopicAssignmentMethods STARTED

kafka.zk.KafkaZkClientTest > testTopicAssignmentMethods PASSED

kafka.zk.KafkaZkClientTest > testDeleteRecursive STARTED

kafka.zk.KafkaZkClientTest > testDeleteRecursive PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytesWithCompression 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytesWithCompression 
PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > 
testIteratorIsConsistentWithCompression STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > 
testIteratorIsConsistentWithCompression PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testIteratorIsConsistent 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testIteratorIsConsistent PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testEqualsWithCompression 
STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testEqualsWithCompression 
PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testWrittenEqualsRead STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testWrittenEqualsRead PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testEquals STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testEquals PASSED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytes STARTED

kafka.javaapi.message.ByteBufferMessageSetTest > testSizeInBytes PASSED

kafka.javaapi.consumer.ZookeeperConsumerConnectorTest > testBasic STARTED

kafka.javaapi.consumer.ZookeeperConsumerConnectorTest > testBasic PASSED

kafka.metrics.MetricsTest > testMetricsReporterAfterDeletingTopic STARTED

kafka.metrics.MetricsTest > testMetricsReporterAfterDeletingTopic PASSED

kafka.metrics.MetricsTest > 
testBrokerTopicMetricsUnregisteredAfterDeletingTopic STARTED

kafka.metrics.MetricsTest > 
testBrokerTopicMetricsUnregisteredAfterDeletingTopic PASSED

kafka.metrics.MetricsTest > testClusterIdMetric STARTED

kafka.metrics.MetricsTest > testClusterIdMetric PASSED

kafka.metrics.MetricsTest > testControllerMetrics STARTED

kafka.metrics.MetricsTest > testControllerMetrics PASSED

kafka.metrics.MetricsTest > testWindowsStyleTagNames STARTED

kafka.metrics.MetricsTest > testWindowsStyleTagNames PASSED

kafka.metrics.MetricsTest > testMetricsLeak STARTED

kafka.metrics.MetricsTest > testMetricsLeak PASSED

kafka.metrics.MetricsTest > testBrokerTopicMetricsBytesInOut STARTED

kafka.metrics.MetricsTest > testBrokerTopicMetricsBytesInOut PASSED

kafka.metrics.KafkaTimerTest > testKafkaTimer STARTED

kafka.metrics.KafkaTimerTest > testKafkaTimer PASSED

kafka.security.auth.ResourceTypeTest > testJavaConversions STARTED

kafka.security.auth.ResourceTypeTest > testJavaConversions PASSED

kafka.security.auth.ResourceTypeTest > testFromString STARTED

kafka.security.auth.ResourceTypeTest > testFromString PASSED

kafka.security.auth.OperationTest > testJavaConversions STARTED

kafka.security.auth.OperationTest > testJavaConversions PASSED

kafka.security.auth.ZkAuthorizationTest > testIsZkSecurityEnabled STARTED

kafka.security.auth.ZkAuthorizationTest > testIsZkSecurityEnabled PASSED

kafka.security.auth.ZkAuthorizationTest > testZkUtils STARTED

kafka.security.auth.ZkAuthorizationTest > testZkUtils PASSED

Build failed in Jenkins: kafka-trunk-jdk8 #2238

2017-11-24 Thread Apache Jenkins Server
See 


Changes:

[ismael] MINOR: Temporarily disable testLogStartOffsetCheckpoint

--
[...truncated 2.94 MB...]
org.apache.kafka.common.security.JaasContextTest > testControlFlag PASSED

org.apache.kafka.common.security.plain.PlainSaslServerTest > 
noAuthorizationIdSpecified STARTED

org.apache.kafka.common.security.plain.PlainSaslServerTest > 
noAuthorizationIdSpecified PASSED

org.apache.kafka.common.security.plain.PlainSaslServerTest > 
authorizatonIdEqualsAuthenticationId STARTED

org.apache.kafka.common.security.plain.PlainSaslServerTest > 
authorizatonIdEqualsAuthenticationId PASSED

org.apache.kafka.common.security.plain.PlainSaslServerTest > 
authorizatonIdNotEqualsAuthenticationId STARTED

org.apache.kafka.common.security.plain.PlainSaslServerTest > 
authorizatonIdNotEqualsAuthenticationId PASSED

org.apache.kafka.common.security.authenticator.ClientAuthenticationFailureTest 
> testProducerWithInvalidCredentials STARTED

org.apache.kafka.common.security.authenticator.ClientAuthenticationFailureTest 
> testProducerWithInvalidCredentials PASSED

org.apache.kafka.common.security.authenticator.ClientAuthenticationFailureTest 
> testTransactionalProducerWithInvalidCredentials STARTED

org.apache.kafka.common.security.authenticator.ClientAuthenticationFailureTest 
> testTransactionalProducerWithInvalidCredentials PASSED

org.apache.kafka.common.security.authenticator.ClientAuthenticationFailureTest 
> testConsumerWithInvalidCredentials STARTED

org.apache.kafka.common.security.authenticator.ClientAuthenticationFailureTest 
> testConsumerWithInvalidCredentials PASSED

org.apache.kafka.common.security.authenticator.ClientAuthenticationFailureTest 
> testAdminClientWithInvalidCredentials STARTED

org.apache.kafka.common.security.authenticator.ClientAuthenticationFailureTest 
> testAdminClientWithInvalidCredentials PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testMissingUsernameSaslPlain STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testMissingUsernameSaslPlain PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testValidSaslScramMechanisms STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testValidSaslScramMechanisms PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
oldSaslScramSslServerWithoutSaslAuthenticateHeaderFailure STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
oldSaslScramSslServerWithoutSaslAuthenticateHeaderFailure PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
oldSaslScramPlaintextServerWithoutSaslAuthenticateHeaderFailure STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
oldSaslScramPlaintextServerWithoutSaslAuthenticateHeaderFailure PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
oldSaslScramPlaintextServerWithoutSaslAuthenticateHeader STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
oldSaslScramPlaintextServerWithoutSaslAuthenticateHeader PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testMechanismPluggability STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testMechanismPluggability PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testScramUsernameWithSpecialCharacters STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testScramUsernameWithSpecialCharacters PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testApiVersionsRequestWithUnsupportedVersion STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testApiVersionsRequestWithUnsupportedVersion PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testMissingPasswordSaslPlain STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testMissingPasswordSaslPlain PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testInvalidLoginModule STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
testInvalidLoginModule PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
oldSaslPlainPlaintextClientWithoutSaslAuthenticateHeader STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
oldSaslPlainPlaintextClientWithoutSaslAuthenticateHeader PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
oldSaslPlainSslClientWithoutSaslAuthenticateHeader STARTED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 
oldSaslPlainSslClientWithoutSaslAuthenticateHeader PASSED

org.apache.kafka.common.security.authenticator.SaslAuthenticatorTest > 

[GitHub] kafka pull request #4262: MINOR: Temporarily disable testLogStartOffsetCheck...

2017-11-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4262


---


[jira] [Resolved] (KAFKA-6248) Enable configuration of internal topics of Kafka Streams applications

2017-11-24 Thread Tim Van Laer (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6248?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tim Van Laer resolved KAFKA-6248.
-
Resolution: Not A Problem

As this is supported in Kafka Streams 1.0.0, I close the stream.

> Enable configuration of internal topics of Kafka Streams applications
> -
>
> Key: KAFKA-6248
> URL: https://issues.apache.org/jira/browse/KAFKA-6248
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Tim Van Laer
>Priority: Minor
>
> In the current implementation of Kafka Streams, it is not possible to set 
> custom configuration to internal topics (e.g. max.message.bytes, 
> retention.ms...). It would be nice if a developer can set some specific 
> configuration. 
> E.g. if you want to store messages bigger than 1MiB in a state store, you 
> have to alter the corresponding changelog topic with a max.message.bytes 
> setting. 
> The workaround is to create the 'internal' topics upfront using the correct 
> naming convention so Kafka Streams will use the explicitly defined topics as 
> if they are internal. 
> An alternative is to alter the internal topics after the Kafka Streams 
> application is started and has created its internal topics. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Wiki permissions

2017-11-24 Thread Skrzypek, Jonathan
Hi,

Could I get the permissions to create pages on the Kafka confluence ?
I would like the submit a KIP.
My wiki id is Jonathan Skrzypek

Regards,

Jonathan Skrzypek
Middleware Engineering
Messaging Engineering
Goldman Sachs International
Christchurch Court - 10-15 Newgate Street
London EC1A 7HD
Tel: +442070512977



[GitHub] kafka pull request #4263: KAFKA-6241: Enable dynamic updates of broker SSL k...

2017-11-24 Thread rajinisivaram
GitHub user rajinisivaram opened a pull request:

https://github.com/apache/kafka/pull/4263

KAFKA-6241: Enable dynamic updates of broker SSL keystore

Enable dynamic broker configuration (see KIP-225 for details). Includes
 - Base implementation to allow specific broker configs and custom configs 
to be dynamically updated
 - Extend DescribeConfigsRequest/Response to return all synonym configs and 
their sources in the order of precedence
 - Extend AdminClient to alter dynamic broker configs
 - Dynamic update of SSL keystores

### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/rajinisivaram/kafka 
KAFKA-6241-dynamic-keystore

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4263.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4263


commit 48a12506384d753a1ece6a92fcc59c3f60a35a81
Author: Rajini Sivaram 
Date:   2017-11-20T17:06:31Z

KAFKA-6241: Enable dynamic updates of broker SSL keystore




---


[GitHub] kafka pull request #4262: MINOR: Temporarily disable testLogStartOffsetCheck...

2017-11-24 Thread ijuma
GitHub user ijuma opened a pull request:

https://github.com/apache/kafka/pull/4262

MINOR: Temporarily disable testLogStartOffsetCheckpoint

It's failing often and it seems like there are multiple
reasons. PR #4238 will re-enable it.

### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ijuma/kafka 
temporarily-disable-test-log-start-offset-checkpoint

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4262.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4262


commit a24d17011c886ef66a099972d76ee2962f9839e2
Author: Ismael Juma 
Date:   2017-11-24T14:28:50Z

MINOR: Temporarily disable testLogStartOffsetCheckpoint

It's failing often and it seems like there are multiple
reasons. PR #4238 will re-enable it.




---


[GitHub] kafka pull request #4261: MINOR: Avoid intermediate strings when parsing/dec...

2017-11-24 Thread ijuma
GitHub user ijuma opened a pull request:

https://github.com/apache/kafka/pull/4261

MINOR: Avoid intermediate strings when parsing/decoding ZK JSON

Also:
- Fix bug in result type of `createSequentialPersistentPath`
- Remove duplicated code from `ReplicationUtils`
- Move `propagateIsrChanges` from `ReplicationUtils` to `KafkaZkClient`
- Add tests
- Minor clean-ups

### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation 
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ijuma/kafka zk-data-improvements

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/4261.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4261


commit df33ed9ef331931cc32a5310d2717f1c65fae202
Author: Ismael Juma 
Date:   2017-11-24T12:52:06Z

Avoid intermediate Strings when parsing/encoding JSON

commit 2e7f1913fde1a5a041e382e21d6355c322815b97
Author: Ismael Juma 
Date:   2017-11-24T12:52:34Z

Minor clean-ups

commit fc2c8f8f153ec6a01477445c426115687680b7bd
Author: Ismael Juma 
Date:   2017-11-24T13:18:09Z

Add test for `createSequentialPersistentPath` and fix bug

commit 8296e3e98121fadacdff00def7277c2f852b8079
Author: Ismael Juma 
Date:   2017-11-24T13:55:39Z

Remove isr change duplication and more tests




---


Build failed in Jenkins: kafka-trunk-jdk8 #2237

2017-11-24 Thread Apache Jenkins Server
See 


Changes:

[ismael] KAFKA-6074; Use ZookeeperClient in ReplicaManager and Partition

--
[...truncated 400.89 KB...]
kafka.security.auth.ResourceTypeTest > testFromString STARTED

kafka.security.auth.ResourceTypeTest > testFromString PASSED

kafka.security.auth.OperationTest > testJavaConversions STARTED

kafka.security.auth.OperationTest > testJavaConversions PASSED

kafka.security.auth.AclTest > testAclJsonConversion STARTED

kafka.security.auth.AclTest > testAclJsonConversion PASSED

kafka.security.auth.ZkAuthorizationTest > testIsZkSecurityEnabled STARTED

kafka.security.auth.ZkAuthorizationTest > testIsZkSecurityEnabled PASSED

kafka.security.auth.ZkAuthorizationTest > testZkUtils STARTED

kafka.security.auth.ZkAuthorizationTest > testZkUtils PASSED

kafka.security.auth.ZkAuthorizationTest > testZkAntiMigration STARTED

kafka.security.auth.ZkAuthorizationTest > testZkAntiMigration PASSED

kafka.security.auth.ZkAuthorizationTest > testZkMigration STARTED

kafka.security.auth.ZkAuthorizationTest > testZkMigration PASSED

kafka.security.auth.ZkAuthorizationTest > testChroot STARTED

kafka.security.auth.ZkAuthorizationTest > testChroot PASSED

kafka.security.auth.ZkAuthorizationTest > testDelete STARTED

kafka.security.auth.ZkAuthorizationTest > testDelete PASSED

kafka.security.auth.ZkAuthorizationTest > testDeleteRecursive STARTED

kafka.security.auth.ZkAuthorizationTest > testDeleteRecursive PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testAllowAllAccess STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testAllowAllAccess PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testLocalConcurrentModificationOfResourceAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testLocalConcurrentModificationOfResourceAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testHighConcurrencyDeletionOfResourceAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testHighConcurrencyDeletionOfResourceAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testNoAclFound STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testNoAclFound PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testAclInheritance STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testAclInheritance PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testDistributedConcurrentModificationOfResourceAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testDistributedConcurrentModificationOfResourceAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testAclManagementAPIs STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testAclManagementAPIs PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testWildCardAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testWildCardAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testTopicAcl STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testTopicAcl PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testSuperUserHasAccess STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testSuperUserHasAccess PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testDenyTakesPrecedence STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testDenyTakesPrecedence PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testNoAclFoundOverride STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testNoAclFoundOverride PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testHighConcurrencyModificationOfResourceAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testHighConcurrencyModificationOfResourceAcls FAILED
java.lang.AssertionError: expected acls Set(User:36 has Allow permission 
for operations: Read from hosts: *, User:7 has Allow permission for operations: 
Read from hosts: *, User:21 has Allow permission for operations: Read from 
hosts: *, User:39 has Allow permission for operations: Read from hosts: *, 
User:43 has Allow permission for operations: Read from hosts: *, User:3 has 
Allow permission for operations: Read from hosts: *, User:35 has Allow 
permission for operations: Read from hosts: *, User:15 has Allow permission for 
operations: Read from hosts: *, User:16 has Allow permission for operations: 
Read from hosts: *, User:22 has Allow permission for operations: Read from 
hosts: *, User:26 has Allow permission for operations: Read from hosts: *, 
User:11 has Allow permission for operations: Read from hosts: *, User:38 has 
Allow permission for operations: Read from hosts: *, User:8 has Allow 
permission for operations: Read from hosts: *, User:28 has Allow permission for 
operations: Read from hosts: *, User:32 has Allow permission for operations: 
Read from hosts: *, User:25 has Allow permission for operations: Read from 
hosts: *, User:41 has Allow permission for operations: Read from hosts: *, 
User:44 has Allow permission for operations: Read from hosts: *, 

[GitHub] kafka pull request #4254: KAFKA-6074 Use ZookeeperClient in ReplicaManager a...

2017-11-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/4254


---


Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams

2017-11-24 Thread Jan Filipiak
Cleary we show the oldValue to the user. We have to, because we filter 
after the store.

https://github.com/axbaretto/kafka/blob/master/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableFilter.java#L96

I cannot help you following this. It is really obvious and I am running 
out of tools for explaining.


Thanks for understanding my point to put filter before. Not only would 
it make the store smaller. It would make this feature reasonably 
possible and the framework easier. Interestingly it would also help to 
move IQ into more reasonable directions. And it might help understand 
that we do not need any intermediate representation of the topology,


KIP-182 I have no clue what everyone has with their "bytestores" so 
broken. But putting another store after doesn't help when the store 
before is the problem.





On 24.11.2017 05:08, Matthias J. Sax wrote:

 From a DSL point of view, users only see the new value on a
KTable#filter anyway. So why should it be an issue that we use
 pair under the hood?

User sees newValue and gets corresponding RecordContext. I can't see any
issue here?

I cannot follow here:


Even when we have a statefull operation last. We move it to the very
first processor (KtableSource)
and therefore cant present a proper RecordContext.



With regard to `builder.table().filter()`:

I see you point that it would be good to be able to apply the filter()
first to reduce the stat store size of the table. But how is this
related to KIP-159?

Btw: with KIP-182, I am wondering if this would not be possible, by
putting a custom dummy store into the table and materialize the filter
result afterwards? It's not a nice way to do, but seems to be possible.


-Matthias

On 11/23/17 4:56 AM, Jan Filipiak wrote:

The comment is valid. It falls exactly into this topic, it has exactly
todo with this!
Even when we have a statefull operation last. We move it to the very
first processor (KtableSource)
and therefore cant present a proper RecordContext.

Regarding the other Jiras you are referring to. They harm the project
more than they do good!
There is no need for this kind of optimizer and meta representation and
what not. I hope they
never get implemented.

Best Jan


On 22.11.2017 14:44, Damian Guy wrote:

Jan, i think you comment with respect to filtering is valid, though
not for
this KIP. We have separate JIRAs for topology optimization of which this
falls into.

Thanks,
Damian

On Wed, 22 Nov 2017 at 02:25 Guozhang Wang  wrote:


Jan,

Not sure I understand your argument that "we still going to present
change.oldValue to the filter even though the record context() is for
change.newValue". Are you referring to `KTableFilter#process()`? If yes
could you point to me which LOC are you concerning about?


Guozhang


On Mon, Nov 20, 2017 at 9:29 PM, Jan Filipiak 
wrote:


a remark of mine that got missed during migration:

There is this problem that even though we have source.table.filter.join
the state-fullness happens at the table step not a the join step. In a
filter
we still going to present change.oldValue to the filter even though the
record context() is for change.newValue. I would go as far as applying
the filter before the table processor. Not to just get KIP-159, but

because

I think its a side effect of a non ideal topology layout. If i can
filter
99% of my
records. my state could be way smaller. Also widely escalates the
context
of the KIP

I can only see upsides of executing the filter first.

Best Jan



On 20.11.2017 22:22, Matthias J. Sax wrote:


I am moving this back to the DISCUSS thread... Last 10 emails were
sent
to VOTE thread.

Copying Guozhang's last summary below. Thanks for this summary. Very
comprehensive!

It seems, we all agree, that the current implementation of the context
at PAPI level is ok, but we should not leak it into DSL.

Thus, we can go with (2) or (3), were (3) is an extension to (2)
carrying the context to more operators than just sources. It also
seems,
that we all agree, that many-to-one operations void the context.

I still think, that just going with plain (2) is too restrictive --
but
I am also fine if we don't go with the full proposal of (3).

Also note, that the two operators filter() and filterNot() don't
modify
the record and thus for both, it would be absolutely valid to keep the
context.

I personally would keep the context for at least all one-to-one
operators. One-to-many is debatable and I am fine to not carry the
context further: at least the offset information is questionable for
this case -- note thought, that semantically, the timestamp is
inherited
via one-to-many, and I also think this applies to "topic" and
"partition". Thus, I think it's still valuable information we can
carry
downstreams.


-Matthias

Jan: which approach are you referring to as "the approach that is
on the

table would be perfect"?

Note that in today's PAPI layer we are already