[jira] [Commented] (KAFKA-4281) Should be able to forward aggregation values immediately

2016-10-09 Thread Greg Fodor (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15560765#comment-15560765
 ] 

Greg Fodor commented on KAFKA-4281:
---

PR: https://github.com/apache/kafka/pull/1998

If this approach seems sane, please take a look especially at the window 
variants -- I am not too familiar with those APIs.

> Should be able to forward aggregation values immediately
> 
>
> Key: KAFKA-4281
> URL: https://issues.apache.org/jira/browse/KAFKA-4281
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> KIP-63 introduced changes to the behavior of aggregations such that the 
> result of aggregations will not appear to subsequent processors until a state 
> store flush occurs. This is problematic for latency sensitive aggregations 
> since flushes occur generally at commit.interval.ms, which is usually a few 
> seconds. Combined with several aggregations, this can result in several 
> seconds of latency through a topology for steps dependent upon aggregations.
> Two potential solutions:
> - Allow finer control over the state store flushing intervals
> - Allow users to change the behavior so that certain aggregations will 
> immediately forward records to the next step (as was the case pre-KIP-63)
> A PR is attached that takes the second approach. To add this unfortunately a 
> large number of files needed to be touched, and this effectively doubles the 
> number of method signatures around grouping on KTable and KStream. I tried an 
> alternative approach that let the user opt-in to immediate forwarding via an 
> additional builder method on KGroupedStream/Table but this didn't work as 
> expected because in order for the latency to go away, the KTableImpl itself 
> must also mark its source as forward immediate (otherwise we will still see 
> latency due to the materialization of the KTableSource still relying upon 
> state store flushes to propagate.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: Store flushing on commit.interval.ms from KIP-63 introduces aggregation latency

2016-10-09 Thread Greg Fodor
JIRA opened here: https://issues.apache.org/jira/browse/KAFKA-4281

On Sun, Oct 9, 2016 at 2:02 AM, Greg Fodor  wrote:
> I went ahead and did some more testing, and it feels to me one option
> for resolving this issue is having a method on KGroupedStream which
> can be used to configure if the operations on it (reduce/aggregate)
> will forward immediately or not. I did a quick patch and was able to
> determine that if the records are forwarded immediately it resolves
> the issue I am seeing. Having it be done on a per-KGroupedStream basis
> would provide maximum flexibility.
>
> On Sun, Oct 9, 2016 at 1:06 AM, Greg Fodor  wrote:
>> I'm taking 0.10.1 for a spin on our existing Kafka Streams jobs and
>> I'm hitting what seems to be a serious issue (at least, for us) with
>> the changes brought about in KIP-63. In our job, we have a number of
>> steps in the topology where we perform a repartition and aggregation
>> on topics that require low latency. These topics have a very low
>> message volume but require subsecond latency for the aggregations to
>> complete since they are configuration data that drive the rest of the
>> job and need to be applied immediately.
>>
>> In 0.10.0, we performed a through (for repartitioning) and aggregateBy
>> and this resulted in minimal latency as the aggregateBy would just
>> result in a consumer attached to the output of the through and the
>> processor would consume + aggregate messages immediately passing them
>> to the next step in the topology.
>>
>> However, in 0.10.1 the aggregateBy API is no longer available and it
>> is necessary to pivot the data through a groupByKey and then
>> aggregate(). The problem is that this mechanism results in the
>> intermediate KTable state store storing the data as usual, but the
>> data is not forwarded downstream until the next store flush. (Due to
>> the use of ForwardingCacheFlushListener instead of calling forward()
>> during the process of the record.)
>>
>> As noted in KIP-63 and as I saw in the code, the flush interval of
>> state stores is commit.interval.ms. For us, this has been tuned to a
>> few seconds, and since we have a number of these aggregations in our
>> job sequentially, this now results in many seconds of latency in the
>> worst case for a tuple to travel through our topology.
>>
>> It seems too inflexible to have the flush interval always be the same
>> as the commit interval across all aggregates. For certain aggregations
>> which are idempotent regardless of messages being reprocessed, being
>> able to flush more often than the commit interval seems like a very
>> important option when lower latency is required. It would still make
>> sense to flush every commit as well, but having an additional
>> configuration to set the maximum time between state store flushes
>> seems like it would solve our problem.
>>
>> In our case, we'd set our flush interval to a few hundred ms. Ideally,
>> we would really prefer to be able to disable interval based flushing
>> altogether (and just put + forward all processed records) for certain
>> KTables that are low volume, latency sensitive, and which are
>> idempotent under message reprocessing.
>>
>> Thanks for any help! Right now the only option it seems is for us to
>> radically lower the commit interval and accept any leftover latency,
>> but unless we can find a sweet spot this may be a blocker for us to
>> moving to 0.10.1.


[jira] [Commented] (KAFKA-4281) Should be able to forward aggregation values immediately

2016-10-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15560764#comment-15560764
 ] 

ASF GitHub Bot commented on KAFKA-4281:
---

GitHub user gfodor opened a pull request:

https://github.com/apache/kafka/pull/1998

KAFKA-4281: Should be able to forward aggregation values immediately

https://issues.apache.org/jira/browse/KAFKA-4281

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/AltspaceVR/kafka KAFKA-4281

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1998.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1998


commit dfe004a24ff6491f286ac9fd405b6a1cae8ae2f5
Author: Greg Fodor 
Date:   2016-10-09T22:46:02Z

Added forwardImmediately argument to various grouping APIs to allow users 
to specify that records should be immediately forwarded during aggregations, etc




> Should be able to forward aggregation values immediately
> 
>
> Key: KAFKA-4281
> URL: https://issues.apache.org/jira/browse/KAFKA-4281
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0
>Reporter: Greg Fodor
>Assignee: Guozhang Wang
>
> KIP-63 introduced changes to the behavior of aggregations such that the 
> result of aggregations will not appear to subsequent processors until a state 
> store flush occurs. This is problematic for latency sensitive aggregations 
> since flushes occur generally at commit.interval.ms, which is usually a few 
> seconds. Combined with several aggregations, this can result in several 
> seconds of latency through a topology for steps dependent upon aggregations.
> Two potential solutions:
> - Allow finer control over the state store flushing intervals
> - Allow users to change the behavior so that certain aggregations will 
> immediately forward records to the next step (as was the case pre-KIP-63)
> A PR is attached that takes the second approach. To add this unfortunately a 
> large number of files needed to be touched, and this effectively doubles the 
> number of method signatures around grouping on KTable and KStream. I tried an 
> alternative approach that let the user opt-in to immediate forwarding via an 
> additional builder method on KGroupedStream/Table but this didn't work as 
> expected because in order for the latency to go away, the KTableImpl itself 
> must also mark its source as forward immediate (otherwise we will still see 
> latency due to the materialization of the KTableSource still relying upon 
> state store flushes to propagate.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1998: KAFKA-4281: Should be able to forward aggregation ...

2016-10-09 Thread gfodor
GitHub user gfodor opened a pull request:

https://github.com/apache/kafka/pull/1998

KAFKA-4281: Should be able to forward aggregation values immediately

https://issues.apache.org/jira/browse/KAFKA-4281

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/AltspaceVR/kafka KAFKA-4281

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1998.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1998


commit dfe004a24ff6491f286ac9fd405b6a1cae8ae2f5
Author: Greg Fodor 
Date:   2016-10-09T22:46:02Z

Added forwardImmediately argument to various grouping APIs to allow users 
to specify that records should be immediately forwarded during aggregations, etc




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-4281) Should be able to forward aggregation values immediately

2016-10-09 Thread Greg Fodor (JIRA)
Greg Fodor created KAFKA-4281:
-

 Summary: Should be able to forward aggregation values immediately
 Key: KAFKA-4281
 URL: https://issues.apache.org/jira/browse/KAFKA-4281
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.1.0
Reporter: Greg Fodor
Assignee: Guozhang Wang


KIP-63 introduced changes to the behavior of aggregations such that the result 
of aggregations will not appear to subsequent processors until a state store 
flush occurs. This is problematic for latency sensitive aggregations since 
flushes occur generally at commit.interval.ms, which is usually a few seconds. 
Combined with several aggregations, this can result in several seconds of 
latency through a topology for steps dependent upon aggregations.

Two potential solutions:
- Allow finer control over the state store flushing intervals
- Allow users to change the behavior so that certain aggregations will 
immediately forward records to the next step (as was the case pre-KIP-63)

A PR is attached that takes the second approach. To add this unfortunately a 
large number of files needed to be touched, and this effectively doubles the 
number of method signatures around grouping on KTable and KStream. I tried an 
alternative approach that let the user opt-in to immediate forwarding via an 
additional builder method on KGroupedStream/Table but this didn't work as 
expected because in order for the latency to go away, the KTableImpl itself 
must also mark its source as forward immediate (otherwise we will still see 
latency due to the materialization of the KTableSource still relying upon state 
store flushes to propagate.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Build failed in Jenkins: kafka-trunk-jdk8 #959

2016-10-09 Thread Apache Jenkins Server
See 

Changes:

[jason] MINOR: Update Quickstart in documentation to account for Windows

--
[...truncated 5509 lines...]
kafka.api.AuthorizerIntegrationTest > 
testCreatePermissionNeededToReadFromNonExistentTopic STARTED

kafka.api.AuthorizerIntegrationTest > 
testCreatePermissionNeededToReadFromNonExistentTopic PASSED

kafka.api.AuthorizerIntegrationTest > 
testPatternSubscriptionWithTopicDescribeOnlyAndGroupRead STARTED

kafka.api.AuthorizerIntegrationTest > 
testPatternSubscriptionWithTopicDescribeOnlyAndGroupRead PASSED

kafka.api.AuthorizerIntegrationTest > testDeleteWithWildCardAuth STARTED

kafka.api.AuthorizerIntegrationTest > testDeleteWithWildCardAuth PASSED

kafka.api.AuthorizerIntegrationTest > testOffsetFetchWithNoAccess STARTED

kafka.api.AuthorizerIntegrationTest > testOffsetFetchWithNoAccess PASSED

kafka.api.AuthorizerIntegrationTest > testProduceWithTopicRead STARTED

kafka.api.AuthorizerIntegrationTest > testProduceWithTopicRead PASSED

kafka.api.AuthorizerIntegrationTest > testListOffsetsWithNoTopicAccess STARTED

kafka.api.AuthorizerIntegrationTest > testListOffsetsWithNoTopicAccess PASSED

kafka.api.AuthorizerIntegrationTest > 
testCreatePermissionNeededForWritingToNonExistentTopic STARTED

kafka.api.AuthorizerIntegrationTest > 
testCreatePermissionNeededForWritingToNonExistentTopic PASSED

kafka.api.AuthorizerIntegrationTest > 
testPatternSubscriptionMatchingInternalTopicWithDescribeOnlyPermission STARTED

kafka.api.AuthorizerIntegrationTest > 
testPatternSubscriptionMatchingInternalTopicWithDescribeOnlyPermission PASSED

kafka.api.AuthorizerIntegrationTest > 
testSimpleConsumeWithOffsetLookupAndNoGroupAccess STARTED

kafka.api.AuthorizerIntegrationTest > 
testSimpleConsumeWithOffsetLookupAndNoGroupAccess PASSED

kafka.api.AuthorizerIntegrationTest > testConsumeWithTopicDescribe STARTED

kafka.api.AuthorizerIntegrationTest > testConsumeWithTopicDescribe PASSED

kafka.api.AuthorizerIntegrationTest > testOffsetFetchWithNoTopicAccess STARTED

kafka.api.AuthorizerIntegrationTest > testOffsetFetchWithNoTopicAccess PASSED

kafka.api.AuthorizerIntegrationTest > testCommitWithNoTopicAccess STARTED

kafka.api.AuthorizerIntegrationTest > testCommitWithNoTopicAccess PASSED

kafka.api.AuthorizerIntegrationTest > testProduceWithNoTopicAccess STARTED

kafka.api.AuthorizerIntegrationTest > testProduceWithNoTopicAccess PASSED

kafka.api.AuthorizerIntegrationTest > testProduceWithTopicWrite STARTED

kafka.api.AuthorizerIntegrationTest > testProduceWithTopicWrite PASSED

kafka.api.AuthorizerIntegrationTest > testAuthorizationWithTopicNotExisting 
STARTED

kafka.api.AuthorizerIntegrationTest > testAuthorizationWithTopicNotExisting 
PASSED

kafka.api.AuthorizerIntegrationTest > testListOffsetsWithTopicDescribe STARTED

kafka.api.AuthorizerIntegrationTest > testListOffsetsWithTopicDescribe PASSED

kafka.api.AuthorizerIntegrationTest > testUnauthorizedDeleteWithDescribe STARTED

kafka.api.AuthorizerIntegrationTest > testUnauthorizedDeleteWithDescribe PASSED

kafka.api.AuthorizerIntegrationTest > testConsumeWithTopicAndGroupRead STARTED

kafka.api.AuthorizerIntegrationTest > testConsumeWithTopicAndGroupRead PASSED

kafka.api.AuthorizerIntegrationTest > 
testPatternSubscriptionNotMatchingInternalTopic STARTED

kafka.api.AuthorizerIntegrationTest > 
testPatternSubscriptionNotMatchingInternalTopic PASSED

kafka.api.AuthorizerIntegrationTest > testUnauthorizedDeleteWithoutDescribe 
STARTED

kafka.api.AuthorizerIntegrationTest > testUnauthorizedDeleteWithoutDescribe 
PASSED

kafka.api.AuthorizerIntegrationTest > testConsumeWithTopicWrite STARTED

kafka.api.AuthorizerIntegrationTest > testConsumeWithTopicWrite PASSED

kafka.api.AuthorizerIntegrationTest > testOffsetFetchWithNoGroupAccess STARTED

kafka.api.AuthorizerIntegrationTest > testOffsetFetchWithNoGroupAccess PASSED

kafka.api.AuthorizerIntegrationTest > testCommitWithNoAccess STARTED

kafka.api.AuthorizerIntegrationTest > testCommitWithNoAccess PASSED

kafka.api.AuthorizerIntegrationTest > testConsumeWithoutTopicDescribeAccess 
STARTED

kafka.api.AuthorizerIntegrationTest > testConsumeWithoutTopicDescribeAccess 
PASSED

kafka.api.AuthorizerIntegrationTest > testCommitWithNoGroupAccess STARTED

kafka.api.AuthorizerIntegrationTest > testCommitWithNoGroupAccess PASSED

kafka.api.AuthorizerIntegrationTest > testConsumeWithNoAccess STARTED

kafka.api.AuthorizerIntegrationTest > testConsumeWithNoAccess PASSED

kafka.api.AuthorizerIntegrationTest > testOffsetFetchWithTopicAndGroupRead 
STARTED

kafka.api.AuthorizerIntegrationTest > testOffsetFetchWithTopicAndGroupRead 
PASSED

kafka.api.AuthorizerIntegrationTest > testCommitWithTopicDescribe STARTED

kafka.api.AuthorizerIntegrationTest > testCommitWithTopicDescribe PASSED

kafka.api.AuthorizerIntegrationTest > testAuthorizationWithTopicExisting STARTED

kafka.api.AuthorizerIntegrationTest > 

Build failed in Jenkins: kafka-0.10.1-jdk7 #58

2016-10-09 Thread Apache Jenkins Server
See 

Changes:

[jason] MINOR: Update Quickstart in documentation to account for Windows

--
[...truncated 6445 lines...]
kafka.api.test.ProducerCompressionTest > testCompression[0] STARTED

kafka.api.test.ProducerCompressionTest > testCompression[0] PASSED

kafka.api.test.ProducerCompressionTest > testCompression[1] STARTED

kafka.api.test.ProducerCompressionTest > testCompression[1] PASSED

kafka.api.test.ProducerCompressionTest > testCompression[2] STARTED

kafka.api.test.ProducerCompressionTest > testCompression[2] PASSED

kafka.api.test.ProducerCompressionTest > testCompression[3] STARTED

kafka.api.test.ProducerCompressionTest > testCompression[3] PASSED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > 
testNoConsumeWithoutDescribeAclViaSubscribe STARTED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > 
testNoConsumeWithoutDescribeAclViaSubscribe PASSED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > testProduceConsumeViaAssign 
STARTED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > testProduceConsumeViaAssign 
PASSED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > 
testNoConsumeWithDescribeAclViaAssign STARTED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > 
testNoConsumeWithDescribeAclViaAssign PASSED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > 
testNoConsumeWithDescribeAclViaSubscribe STARTED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > 
testNoConsumeWithDescribeAclViaSubscribe PASSED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > 
testNoConsumeWithoutDescribeAclViaAssign STARTED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > 
testNoConsumeWithoutDescribeAclViaAssign PASSED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > testNoGroupAcl STARTED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > testNoGroupAcl PASSED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > testNoProduceWithDescribeAcl 
STARTED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > testNoProduceWithDescribeAcl 
PASSED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > 
testProduceConsumeViaSubscribe STARTED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > 
testProduceConsumeViaSubscribe PASSED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > 
testNoProduceWithoutDescribeAcl STARTED

kafka.api.SaslPlainSslEndToEndAuthorizationTest > 
testNoProduceWithoutDescribeAcl PASSED

kafka.api.ConsumerBounceTest > testSeekAndCommitWithBrokerFailures STARTED

kafka.api.ConsumerBounceTest > testSeekAndCommitWithBrokerFailures PASSED

kafka.api.ConsumerBounceTest > testConsumptionWithBrokerFailures STARTED

kafka.api.ConsumerBounceTest > testConsumptionWithBrokerFailures PASSED

kafka.api.SaslPlaintextConsumerTest > testCoordinatorFailover STARTED

kafka.api.SaslPlaintextConsumerTest > testCoordinatorFailover PASSED

kafka.api.SaslPlaintextConsumerTest > testSimpleConsumption STARTED

kafka.api.SaslPlaintextConsumerTest > testSimpleConsumption PASSED

kafka.api.SaslMultiMechanismConsumerTest > testMultipleBrokerMechanisms STARTED

kafka.api.SaslMultiMechanismConsumerTest > testMultipleBrokerMechanisms PASSED

kafka.api.SaslMultiMechanismConsumerTest > testCoordinatorFailover STARTED

kafka.api.SaslMultiMechanismConsumerTest > testCoordinatorFailover PASSED

kafka.api.SaslMultiMechanismConsumerTest > testSimpleConsumption STARTED

kafka.api.SaslMultiMechanismConsumerTest > testSimpleConsumption PASSED

kafka.api.FetchRequestTest > testShuffleWithSingleTopic STARTED

kafka.api.FetchRequestTest > testShuffleWithSingleTopic PASSED

kafka.api.FetchRequestTest > testShuffle STARTED

kafka.api.FetchRequestTest > testShuffle PASSED

kafka.api.UserClientIdQuotaTest > testProducerConsumerOverrideUnthrottled 
STARTED

kafka.api.UserClientIdQuotaTest > testProducerConsumerOverrideUnthrottled PASSED

kafka.api.UserClientIdQuotaTest > testThrottledProducerConsumer STARTED

kafka.api.UserClientIdQuotaTest > testThrottledProducerConsumer PASSED

kafka.api.UserClientIdQuotaTest > testQuotaOverrideDelete STARTED

kafka.api.UserClientIdQuotaTest > testQuotaOverrideDelete PASSED

kafka.api.SslProducerSendTest > testSendNonCompressedMessageWithCreateTime 
STARTED

kafka.api.SslProducerSendTest > testSendNonCompressedMessageWithCreateTime 
PASSED

kafka.api.SslProducerSendTest > testClose STARTED

kafka.api.SslProducerSendTest > testClose PASSED

kafka.api.SslProducerSendTest > testFlush STARTED

kafka.api.SslProducerSendTest > testFlush PASSED

kafka.api.SslProducerSendTest > testSendToPartition STARTED

kafka.api.SslProducerSendTest > testSendToPartition PASSED

kafka.api.SslProducerSendTest > testSendOffset STARTED

kafka.api.SslProducerSendTest > testSendOffset PASSED

kafka.api.SslProducerSendTest > testSendCompressedMessageWithCreateTime STARTED

kafka.api.SslProducerSendTest > testSendCompressedMessageWithCreateTime PASSED

kafka.api.SslProducerSendTest > testCloseWithZeroTimeoutFromCallerThread STARTED


Build failed in Jenkins: kafka-trunk-jdk7 #1614

2016-10-09 Thread Apache Jenkins Server
See 

Changes:

[jason] MINOR: Update Quickstart in documentation to account for Windows

--
[...truncated 14045 lines...]
org.apache.kafka.streams.processor.internals.assignment.TaskAssignorTest > 
testStickiness PASSED

org.apache.kafka.streams.processor.internals.assignment.TaskAssignorTest > 
testAssignWithStandby STARTED

org.apache.kafka.streams.processor.internals.assignment.TaskAssignorTest > 
testAssignWithStandby PASSED

org.apache.kafka.streams.processor.internals.assignment.TaskAssignorTest > 
testAssignWithoutStandby STARTED

org.apache.kafka.streams.processor.internals.assignment.TaskAssignorTest > 
testAssignWithoutStandby PASSED

org.apache.kafka.streams.processor.internals.ProcessorTopologyTest > 
testDrivingMultiplexingTopology STARTED

org.apache.kafka.streams.processor.internals.ProcessorTopologyTest > 
testDrivingMultiplexingTopology PASSED

org.apache.kafka.streams.processor.internals.ProcessorTopologyTest > 
testDrivingStatefulTopology STARTED

org.apache.kafka.streams.processor.internals.ProcessorTopologyTest > 
testDrivingStatefulTopology PASSED

org.apache.kafka.streams.processor.internals.ProcessorTopologyTest > 
testDrivingSimpleTopology STARTED

org.apache.kafka.streams.processor.internals.ProcessorTopologyTest > 
testDrivingSimpleTopology PASSED

org.apache.kafka.streams.processor.internals.ProcessorTopologyTest > 
testDrivingSimpleMultiSourceTopology STARTED

org.apache.kafka.streams.processor.internals.ProcessorTopologyTest > 
testDrivingSimpleMultiSourceTopology PASSED

org.apache.kafka.streams.processor.internals.ProcessorTopologyTest > 
testTopologyMetadata STARTED

org.apache.kafka.streams.processor.internals.ProcessorTopologyTest > 
testTopologyMetadata PASSED

org.apache.kafka.streams.processor.internals.ProcessorTopologyTest > 
testDrivingMultiplexByNameTopology STARTED

org.apache.kafka.streams.processor.internals.ProcessorTopologyTest > 
testDrivingMultiplexByNameTopology PASSED

org.apache.kafka.streams.processor.internals.RecordCollectorTest > 
testSpecificPartition STARTED

org.apache.kafka.streams.processor.internals.RecordCollectorTest > 
testSpecificPartition PASSED

org.apache.kafka.streams.processor.internals.RecordCollectorTest > 
shouldThrowStreamsExceptionAfterMaxAttempts STARTED

org.apache.kafka.streams.processor.internals.RecordCollectorTest > 
shouldThrowStreamsExceptionAfterMaxAttempts PASSED

org.apache.kafka.streams.processor.internals.RecordCollectorTest > 
shouldRetryWhenTimeoutExceptionOccursOnSend STARTED

org.apache.kafka.streams.processor.internals.RecordCollectorTest > 
shouldRetryWhenTimeoutExceptionOccursOnSend PASSED

org.apache.kafka.streams.processor.internals.RecordCollectorTest > 
testStreamPartitioner STARTED

org.apache.kafka.streams.processor.internals.RecordCollectorTest > 
testStreamPartitioner PASSED

org.apache.kafka.streams.processor.internals.PunctuationQueueTest > 
testPunctuationInterval STARTED

org.apache.kafka.streams.processor.internals.PunctuationQueueTest > 
testPunctuationInterval PASSED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenAuthorizationException 
STARTED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenAuthorizationException 
PASSED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenKafkaException STARTED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenKafkaException PASSED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException STARTED

org.apache.kafka.streams.processor.internals.AbstractTaskTest > 
shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException PASSED

org.apache.kafka.streams.processor.internals.StreamPartitionAssignorTest > 
shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair STARTED

org.apache.kafka.streams.processor.internals.StreamPartitionAssignorTest > 
shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair PASSED

org.apache.kafka.streams.processor.internals.StreamPartitionAssignorTest > 
shouldMapUserEndPointToTopicPartitions STARTED

org.apache.kafka.streams.processor.internals.StreamPartitionAssignorTest > 
shouldMapUserEndPointToTopicPartitions PASSED

org.apache.kafka.streams.processor.internals.StreamPartitionAssignorTest > 
shouldAddUserDefinedEndPointToSubscription STARTED

org.apache.kafka.streams.processor.internals.StreamPartitionAssignorTest > 
shouldAddUserDefinedEndPointToSubscription PASSED

org.apache.kafka.streams.processor.internals.StreamPartitionAssignorTest > 
testAssignWithStandbyReplicas STARTED

org.apache.kafka.streams.processor.internals.StreamPartitionAssignorTest > 

Re: [DISCUSS] KIP-82 - Add Record Headers

2016-10-09 Thread K Burstev
I think here we have a situation like SQS in Amazon had, originally not 
supporting headers but as time and requirements increased the need becomes 
evident and finally headers or by which ever name it goes by are introduced got 
added back in 2014.
 
A blog from way back when SQS first added support for "headers" attributes 
including some further very basic use cases for why they decided to add them.
https://aws.amazon.com/blogs/aws/simple-queue-service-message-attributes/
 
I am sure they "passed" before also on adding it, but as a use cases and the 
product matures, it is inevitable they would be added and they did. I think 
Kafka is now at this stage.
 
The fact we have these wrapper work arounds is expensive and not solving our 
problems
 
* every single company re-implementing essentially the wheel to be able to send 
message meta data
* due to no common interface there cannot evolve an eco-system of 
plugins/interceptors to use them (again everyone's is custom but no doubt doing 
the same thing)
* cannot convince 3rd party commercial vendors to invest into adding support, 
as they don't want to write code against custom code written by my company as 
they get no re-use.
* work arounds cause production issues (compaction is just one noted point)

Headers really are a simple, elegant and common solution in my view and are 
addressing all of my above problems and reading the KIP many more needs and use 
cases.

It is too easy sometimes to simply say no without providing an alternative, or 
dismiss peoples real use cases. At the moment I don't see any sensible 
alternative proposition or commitment.
 
Here we have someone/a company addressing a real common need, willing to 
implement the solution it seems fairly advanced in the design also which simply 
needs the finer details discussed, I'll be honest haven't fully reviewed the 
sample code but so far it seems not very invasive, and could be in the next 
release.
 
As such this is why I am +1 for the KIP.
 
As for detail of the discussion about the actual implementation details.
 
For our headers in Kafka maybe everyone could read:
http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/SQSMessageAttributes.html
 
I quite like that a type for the value is passed along with the key and value, 
which means you don't need to know the type of the value ahead of time when 
consuming the header. Im not saying we have to have them, but I think it is 
worth a thought.
 
Kostya


08.10.2016, 00:37, "Nacho Solis" :
> On Fri, Oct 7, 2016 at 8:45 AM, Jay Kreps  wrote:
>
>>  This discussion has come up a number of times and we've always passed.
>
> ​Hopefully this time the arguments will be convincing enough that Kafka can
> decide to do something about it.
> ​
>
>>  One of things that has helped keep Kafka simple is not adding in new
>>  abstractions and concepts except when the proposal is really elegant and
>>  makes things simpler.
>
> ​I completely agree that we want things to be simple and elegant. This is
> exactly what headers provide.
>
> Headers are a clean way to extend the system without sacrificing
> performance or elegance. The are modular and backwards compatible.
>
> ​​
>
>>  Consider three use cases for headers:
>>
>>  ​​
>>   1. Kafka-scope: We want to add a feature to Kafka that needs a
>> particular field.
>
> ​This is a _great_ use case for Kafka headers. Having headers means that
> you can have features that are optional. Features that are slowly deployed
> without needing to move everybody from one protocol version to another
> protocol version. All clients don't have to change and all brokers don't
> have to change.
>
> Without headers you need to parse the messages differently. With headers
> you use the same parser.
> I assume I don't need to get into how this makes the system extensible
> without requiring others to use the same extensions you have.
>
> ​
>
>> 2. Company-scope: You want to add a header to be shared by everyone in
>> your company.
>
> ​It is completely true that for client-side things you don't need a
> architectural header system. You could just write a wrapper and
> encapsulate every message you send. You could achieve end-to-end. Even if
> this end-to-end exists, Kafka currently offers no way to identify the type
> of a message (which I wish we could change), so we have to rely on some
> magic number to identify the type. Once we have that we can have a header
> system.
>
> Avro is useful for encoding schema based systems, but it's not as useful
> for modularity and it's not universal. We have a number of use cases that
> don't use avro (and don't want to). They want to send binary data, but from
> an org perspective still need some structure to be added for accounting,
> tracing, auditing, security, etc. There is some of this data that would
> also be useful at the broker side. This is somewhat problematic at this
> point (say, using a 

Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-09 Thread Becket Qin
Hey David,

Thanks for updating the wiki.

1. I was actually thinking of letting every broker just consume the
__consumer_offsets topic. But it seems less efficient if there are only a
few topics configured for committed offsets based retention. So querying
the committed offsets seems reasonable. From the wiki it is not clear
whether the committed offset query happens sync or async. It is probably
better to do this asynchronously, i.e. in another thread other than the log
deleting thread. Otherwise querying the committed offsets may slow down or
even potentially block the log deletion due to a remote call failure.

2. Using new consumer does not necessarily introduce a new group unless we
use Kafka based group management. But using KafkaConsumer directly to query
the committed offsets may not work in this case because by default it uses
the consumer group in the ConsumerConfig. We can use NetworkClient and see
if we can reuse some of the code in the new consumer. Since there has been
a lot of efforts spent on deprecating the SimpleConsumer, we probably want
to avoid introducing any new usage. Anyway, this is implementation detail
and we can figure that out when writing the patch.

3. What I am thinking is that we want to consider whether we will allow
multiple policies to be set at the same time? If we do allow that, which
one of the policies will take precedence. Otherwise it might be confusing
for the users if they have multiple retention policies set.

In addition to the above, it seems that we need some way to configure the
set of consumer groups a topic should be listening on? If it is through
topic config, it would be good to document the configuration name and
format of value in the wiki as well.

Thanks,

Jiangjie (Becket) Qin



On Sun, Oct 9, 2016 at 7:14 AM, 东方甲乙 <254479...@qq.com> wrote:

> Hi Becket,
> This is david, thanks for the comments.  I have update some info in
> the wiki. All the changes is nearly described in the workflow.
> Answer for the commnets:
> 1. Every brokers only have some of the groups' commit offset which are
> storaged in the __comsumer_offsets topics,  it still have to query other
> coordinator(other brokers) for some group's commit offset.
> So we use the OffsetFetchRequest to query one group's commit offset.
>
>
> 2. If using new consumer to query the commit offset will introduce new
> group, but if we use the OffsetFetchRequest to query (like the
> consumer-offset-checker tool, first find the coordinator and build an
> channel to query), we will not introduce new group.
>
>
> 3. I think the KIP-47's functionality seems a little different from this
> KIP, though we are all modifying the log retention.
>
>
> Thanks,
> David.
>
>
>
>
>
>
>
>
> -- 原始邮件 --
> 发件人: "Becket Qin";;
> 发送时间: 2016年10月9日(星期天) 中午1:00
> 收件人: "dev";
>
> 主题: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention
>
>
>
> Hi David,
>
> Thanks for the explanation. Could you update the KIP-68 wiki to include the
> changes that need to be made?
>
> I have a few more comments below:
>
> 1. We already have an internal topic __consumer_offsets to store all the
> committed offsets. So the brokers can probably just consume from that to
> get the committed offsets for all the partitions of each group.
>
> 2. It is probably better to use o.a.k.clients.consumer.KafkaConsumer
> instead of SimpleConsumer. It handles all the leader movements and
> potential failures.
>
> 3. KIP-47 also has a proposal for a new time based log retention policy and
> propose a new configuration on log retention. It may be worth thinking
> about the behavior together.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Sat, Oct 8, 2016 at 2:15 AM, Pengwei (L)  wrote:
>
> > Hi Becket,
> >
> >   Thanks for the feedback:
> > 1.  We use the simple consumer api to query the commit offset, so we
> don't
> > need to specify the consumer group.
> > 2.  Every broker using the simple consumer api(OffsetFetchKey) to query
> > the commit offset in the log retention process.  The client can commit
> > offset or not.
> > 3.  It does not need to distinguish the follower brokers or leader
> > brokers,  every brokers can query.
> > 4.  We don't need to change the protocols, we mainly change the log
> > retention process in the log manager.
> >
> >   One question is the query min offset need O(partitions * groups) time
> > complexity, another alternative is to build an internal topic to save
> every
> > partition's min offset, it can reduce to O(1).
> > I will update the wiki for more details.
> >
> > Thanks,
> > David
> >
> >
> > > Hi Pengwei,
> > >
> > > Thanks for the KIP proposal. It is a very useful KIP. At a high level,
> > the
> > > proposed behavior looks reasonable to me.
> > >
> > > However, it seems that some of the details are not mentioned in the
> KIP.
> > > For example,
> > >
> > > 1. How will the expected consumer group be 

[jira] [Commented] (KAFKA-4217) KStream.transform equivalent of flatMap

2016-10-09 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15560348#comment-15560348
 ] 

Guozhang Wang commented on KAFKA-4217:
--

We designed {{process()}} to be differentiated from {{transform()}} such that 
{{process}} is used as the "end point" of the topology, returning no new data, 
whereas {{transform}} used as connection point of the topology.

I think it is reasonable to extend {{Transformer.transform}} and 
{{Transformer.punctuate}} to return a list of {{R}} typed values.

> KStream.transform equivalent of flatMap
> ---
>
> Key: KAFKA-4217
> URL: https://issues.apache.org/jira/browse/KAFKA-4217
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Guozhang Wang
>
> {{KStream.transform}} gives you access to state stores while allowing you to 
> return zero or one transformed {{KeyValue}}.  Alas, it is unclear what method 
> you should use if you want to access state stores and return zero or multiple 
> {{KeyValue}}.  Presumably you can use {{transform}}, always return {{null}}, 
> and use {{ProcessorContext.forward}} to emit {{KeyValues}}.
> It may be good to introduce a {{transform}}-like {{flatMap}} equivalent, or 
> allow store access from other {{KStream}} methods, such as {{flatMap}} itself.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1990: MINOR: Update Quickstart in documentation to accou...

2016-10-09 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1990


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Temporary node of broke in zookeeper lost

2016-10-09 Thread 隋玉增
hi,
   I meet a issue that the temporary node of broke in zookeeper was lost while 
the process of the broker still exist. At this time, the controller would 
consider it to be offline.  According to zkClient log, I find the session is 
timeout, but handleStateChanged and handleNewSession(in KafkaHealthcheck) is 
not called after the session re-establish. The following is the zkclient log, 
"zk state changed" and "re-reginstering broker info in zk" was not printed 
after syncconnected.
   
   So I want to check the node periodically and call the function register()(in 
KafkaHealthcheck) when the node is not exist.  What's about this solution?




Thanks.

RE?? [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-09 Thread ????????
Hi Becket,
This is david, thanks for the comments.  I have update some info in the 
wiki. All the changes is nearly described in the workflow.
Answer for the commnets:
1. Every brokers only have some of the groups' commit offset which are storaged 
in the __comsumer_offsets topics,  it still have to query other 
coordinator(other brokers) for some group's commit offset.
So we use the OffsetFetchRequest to query one group's commit offset.


2. If using new consumer to query the commit offset will introduce new group, 
but if we use the OffsetFetchRequest to query (like the consumer-offset-checker 
tool, first find the coordinator and build an channel to query), we will not 
introduce new group.


3. I think the KIP-47's functionality seems a little different from this KIP, 
though we are all modifying the log retention. 


Thanks,
David.








--  --
??: "Becket Qin";;
: 2016??10??9??(??) 1:00
??: "dev"; 

: Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention



Hi David,

Thanks for the explanation. Could you update the KIP-68 wiki to include the
changes that need to be made?

I have a few more comments below:

1. We already have an internal topic __consumer_offsets to store all the
committed offsets. So the brokers can probably just consume from that to
get the committed offsets for all the partitions of each group.

2. It is probably better to use o.a.k.clients.consumer.KafkaConsumer
instead of SimpleConsumer. It handles all the leader movements and
potential failures.

3. KIP-47 also has a proposal for a new time based log retention policy and
propose a new configuration on log retention. It may be worth thinking
about the behavior together.

Thanks,

Jiangjie (Becket) Qin

On Sat, Oct 8, 2016 at 2:15 AM, Pengwei (L)  wrote:

> Hi Becket,
>
>   Thanks for the feedback:
> 1.  We use the simple consumer api to query the commit offset, so we don't
> need to specify the consumer group.
> 2.  Every broker using the simple consumer api(OffsetFetchKey) to query
> the commit offset in the log retention process.  The client can commit
> offset or not.
> 3.  It does not need to distinguish the follower brokers or leader
> brokers,  every brokers can query.
> 4.  We don't need to change the protocols, we mainly change the log
> retention process in the log manager.
>
>   One question is the query min offset need O(partitions * groups) time
> complexity, another alternative is to build an internal topic to save every
> partition's min offset, it can reduce to O(1).
> I will update the wiki for more details.
>
> Thanks,
> David
>
>
> > Hi Pengwei,
> >
> > Thanks for the KIP proposal. It is a very useful KIP. At a high level,
> the
> > proposed behavior looks reasonable to me.
> >
> > However, it seems that some of the details are not mentioned in the KIP.
> > For example,
> >
> > 1. How will the expected consumer group be specified? Is it through a per
> > topic dynamic configuration?
> > 2. How do the brokers detect the consumer offsets? Is it required for a
> > consumer to commit offsets?
> > 3. How do all the replicas know the about the committed offsets? e.g. 1)
> > non-coordinator brokers which do not have the committed offsets, 2)
> > follower brokers which do not have consumers directly consuming from it.
> > 4. Is there any other changes need to be made (e.g. new protocols) in
> > addition to the configuration change?
> >
> > It would be great if you can update the wiki to have more details.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Wed, Sep 7, 2016 at 2:26 AM, Pengwei (L) 
> wrote:
> >
> > > Hi All,
> > >I have made a KIP to enhance the log retention, details as follows:
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 68+Add+a+consumed+log+retention+before+log+retention
> > >Now start a discuss thread for this KIP , looking forward to the
> > > feedback.
> > >
> > > Thanks,
> > > David
> > >
> > >
>

Re: Store flushing on commit.interval.ms from KIP-63 introduces aggregation latency

2016-10-09 Thread Greg Fodor
I went ahead and did some more testing, and it feels to me one option
for resolving this issue is having a method on KGroupedStream which
can be used to configure if the operations on it (reduce/aggregate)
will forward immediately or not. I did a quick patch and was able to
determine that if the records are forwarded immediately it resolves
the issue I am seeing. Having it be done on a per-KGroupedStream basis
would provide maximum flexibility.

On Sun, Oct 9, 2016 at 1:06 AM, Greg Fodor  wrote:
> I'm taking 0.10.1 for a spin on our existing Kafka Streams jobs and
> I'm hitting what seems to be a serious issue (at least, for us) with
> the changes brought about in KIP-63. In our job, we have a number of
> steps in the topology where we perform a repartition and aggregation
> on topics that require low latency. These topics have a very low
> message volume but require subsecond latency for the aggregations to
> complete since they are configuration data that drive the rest of the
> job and need to be applied immediately.
>
> In 0.10.0, we performed a through (for repartitioning) and aggregateBy
> and this resulted in minimal latency as the aggregateBy would just
> result in a consumer attached to the output of the through and the
> processor would consume + aggregate messages immediately passing them
> to the next step in the topology.
>
> However, in 0.10.1 the aggregateBy API is no longer available and it
> is necessary to pivot the data through a groupByKey and then
> aggregate(). The problem is that this mechanism results in the
> intermediate KTable state store storing the data as usual, but the
> data is not forwarded downstream until the next store flush. (Due to
> the use of ForwardingCacheFlushListener instead of calling forward()
> during the process of the record.)
>
> As noted in KIP-63 and as I saw in the code, the flush interval of
> state stores is commit.interval.ms. For us, this has been tuned to a
> few seconds, and since we have a number of these aggregations in our
> job sequentially, this now results in many seconds of latency in the
> worst case for a tuple to travel through our topology.
>
> It seems too inflexible to have the flush interval always be the same
> as the commit interval across all aggregates. For certain aggregations
> which are idempotent regardless of messages being reprocessed, being
> able to flush more often than the commit interval seems like a very
> important option when lower latency is required. It would still make
> sense to flush every commit as well, but having an additional
> configuration to set the maximum time between state store flushes
> seems like it would solve our problem.
>
> In our case, we'd set our flush interval to a few hundred ms. Ideally,
> we would really prefer to be able to disable interval based flushing
> altogether (and just put + forward all processed records) for certain
> KTables that are low volume, latency sensitive, and which are
> idempotent under message reprocessing.
>
> Thanks for any help! Right now the only option it seems is for us to
> radically lower the commit interval and accept any leftover latency,
> but unless we can find a sweet spot this may be a blocker for us to
> moving to 0.10.1.


Store flushing on commit.interval.ms from KIP-63 introduces aggregation latency

2016-10-09 Thread Greg Fodor
I'm taking 0.10.1 for a spin on our existing Kafka Streams jobs and
I'm hitting what seems to be a serious issue (at least, for us) with
the changes brought about in KIP-63. In our job, we have a number of
steps in the topology where we perform a repartition and aggregation
on topics that require low latency. These topics have a very low
message volume but require subsecond latency for the aggregations to
complete since they are configuration data that drive the rest of the
job and need to be applied immediately.

In 0.10.0, we performed a through (for repartitioning) and aggregateBy
and this resulted in minimal latency as the aggregateBy would just
result in a consumer attached to the output of the through and the
processor would consume + aggregate messages immediately passing them
to the next step in the topology.

However, in 0.10.1 the aggregateBy API is no longer available and it
is necessary to pivot the data through a groupByKey and then
aggregate(). The problem is that this mechanism results in the
intermediate KTable state store storing the data as usual, but the
data is not forwarded downstream until the next store flush. (Due to
the use of ForwardingCacheFlushListener instead of calling forward()
during the process of the record.)

As noted in KIP-63 and as I saw in the code, the flush interval of
state stores is commit.interval.ms. For us, this has been tuned to a
few seconds, and since we have a number of these aggregations in our
job sequentially, this now results in many seconds of latency in the
worst case for a tuple to travel through our topology.

It seems too inflexible to have the flush interval always be the same
as the commit interval across all aggregates. For certain aggregations
which are idempotent regardless of messages being reprocessed, being
able to flush more often than the commit interval seems like a very
important option when lower latency is required. It would still make
sense to flush every commit as well, but having an additional
configuration to set the maximum time between state store flushes
seems like it would solve our problem.

In our case, we'd set our flush interval to a few hundred ms. Ideally,
we would really prefer to be able to disable interval based flushing
altogether (and just put + forward all processed records) for certain
KTables that are low volume, latency sensitive, and which are
idempotent under message reprocessing.

Thanks for any help! Right now the only option it seems is for us to
radically lower the commit interval and accept any leftover latency,
but unless we can find a sweet spot this may be a blocker for us to
moving to 0.10.1.


Re: [DISCUSS] KIP-68 Add a consumed log retention before log retention

2016-10-09 Thread Dong Lin
Hey David,

Thanks for the KIP. Can you help with the following two questions:

1) If someone start a consumer (e.g. kafka-console-consumer) to consume a
topic for debug/validation purpose, a randome consumer group may be created
and offset may be committed for this consumer group. If no offset commit is
made for this consumer group in the future, will this effectively
disable consumed log retention for this topic? In other words, how do this
KIP distinguish active consumer group from inactive ones?

2) Why do we need new configs such as log.retention.commitoffset.hours? Can
we simply delete log segments if consumed log retention is enabled for this
topic and all consumer groups have consumed messages in the log segment?

Thanks,
Dong



On Sat, Oct 8, 2016 at 2:15 AM, Pengwei (L)  wrote:

> Hi Becket,
>
>   Thanks for the feedback:
> 1.  We use the simple consumer api to query the commit offset, so we don't
> need to specify the consumer group.
> 2.  Every broker using the simple consumer api(OffsetFetchKey) to query
> the commit offset in the log retention process.  The client can commit
> offset or not.
> 3.  It does not need to distinguish the follower brokers or leader
> brokers,  every brokers can query.
> 4.  We don't need to change the protocols, we mainly change the log
> retention process in the log manager.
>
>   One question is the query min offset need O(partitions * groups) time
> complexity, another alternative is to build an internal topic to save every
> partition's min offset, it can reduce to O(1).
> I will update the wiki for more details.
>
> Thanks,
> David
>
>
> > Hi Pengwei,
> >
> > Thanks for the KIP proposal. It is a very useful KIP. At a high level,
> the
> > proposed behavior looks reasonable to me.
> >
> > However, it seems that some of the details are not mentioned in the KIP.
> > For example,
> >
> > 1. How will the expected consumer group be specified? Is it through a per
> > topic dynamic configuration?
> > 2. How do the brokers detect the consumer offsets? Is it required for a
> > consumer to commit offsets?
> > 3. How do all the replicas know the about the committed offsets? e.g. 1)
> > non-coordinator brokers which do not have the committed offsets, 2)
> > follower brokers which do not have consumers directly consuming from it.
> > 4. Is there any other changes need to be made (e.g. new protocols) in
> > addition to the configuration change?
> >
> > It would be great if you can update the wiki to have more details.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Wed, Sep 7, 2016 at 2:26 AM, Pengwei (L) 
> wrote:
> >
> > > Hi All,
> > >I have made a KIP to enhance the log retention, details as follows:
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 68+Add+a+consumed+log+retention+before+log+retention
> > >Now start a discuss thread for this KIP , looking forward to the
> > > feedback.
> > >
> > > Thanks,
> > > David
> > >
> > >
>