Re: Does Kafka 0.9 can guarantee not loss data

2016-09-22 Thread Becket Qin
In order to satisfy a produce response, there are two conditions:
A. The leader's high watermark should be higher than the requiredOffset
(max offset in that produce request of that partition)
B. The number of in sync replica is greater than min.isr.

The ultimate goal here is to make sure at least min.isr number of replicas
has caught up to requiredOffset. So the check is not only whether we have
enough number of replicas in the isr, but also whether those replicas in
the ISR has caught up to the required offset.

In your example, if numAcks is 0 and curInSyncReplica.size >= minIsr, the
produce response won't return if min.isr > 0, because
leaderReplica.highWatermark must be less than requiredOffset given the fact
that numAcks is 0. i.e. condition A is not met.

We are actually even doing a stronger than necessary check here.
Theoretically as long as min.isr number of replicas has caught up to
requiredOffset, we should be able to return the response, but we also
require those replicas to be in the ISR.

On Thu, Sep 22, 2016 at 8:15 PM, Kafka  wrote:

> @wangguozhang,could you give me some advices.
>
> > 在 2016年9月22日,下午6:56,Kafka  写道:
> >
> > Hi all,
> >   in terms of topic, we create a topic with 6 partition,and each
> with 3 replicas.
> >in terms of producer,when we send message with ack -1 using sync
> interface.
> >   in terms of brokers,we set min.insync.replicas to 2.
> >
> > after we review the kafka broker’s code,we know that we send a message
> to broker with ack -1, then we can get response if ISR of this partition is
> great than or equal to min.insync.replicas,but what confused
> > me is replicas in ISR is not strongly consistent,in kafka 0.9 we use
> replica.lag.time.max.ms param to judge whether to shrink ISR, and the
> defaults is 1 ms, so replicas’ data in isr can lag 1ms at most,
> > we we restart broker which own this partitions’ leader, then controller
> will start a new leader election, which will choose the first replica in
> ISR that not equals to current leader as new leader, then this will loss
> data.
> >
> >
> > The main produce handle code shows below:
> > val numAcks = curInSyncReplicas.count(r => {
> >  if (!r.isLocal)
> >if (r.logEndOffset.messageOffset >= requiredOffset) {
> >  trace("Replica %d of %s-%d received offset
> %d".format(r.brokerId, topic, partitionId, requiredOffset))
> >  true
> >}
> >else
> >  false
> >  else
> >true /* also count the local (leader) replica */
> >})
> >
> >trace("%d acks satisfied for %s-%d with acks =
> -1".format(numAcks, topic, partitionId))
> >
> >val minIsr = leaderReplica.log.get.config.minInSyncReplicas
> >
> >if (leaderReplica.highWatermark.messageOffset >= requiredOffset
> ) {
> >  /*
> >  * The topic may be configured not to accept messages if there
> are not enough replicas in ISR
> >  * in this scenario the request was already appended locally and
> then added to the purgatory before the ISR was shrunk
> >  */
> >  if (minIsr <= curInSyncReplicas.size) {
> >(true, ErrorMapping.NoError)
> >  } else {
> >(true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
> >  }
> >} else
> >  (false, ErrorMapping.NoError)
> >
> >
> > why only logging unAcks and not use numAcks to compare with minIsr, if
> numAcks is 0, but curInSyncReplicas.size >= minIsr, then this will return,
> as ISR shrink procedure is not real time, does this will loss data after
> leader election?
> >
> > Feedback is greatly appreciated. Thanks.
> > meituan.inf
> >
> >
> >
>
>
>


Re: Does Kafka 0.9 can guarantee not loss data

2016-09-22 Thread Kafka
@wangguozhang,could you give me some advices.

> 在 2016年9月22日,下午6:56,Kafka  写道:
> 
> Hi all,   
>   in terms of topic, we create a topic with 6 partition,and each with 3 
> replicas.
>in terms of producer,when we send message with ack -1 using sync 
> interface.
>   in terms of brokers,we set min.insync.replicas to 2.
> 
> after we review the kafka broker’s code,we know that we send a message to 
> broker with ack -1, then we can get response if ISR of this partition is 
> great than or equal to min.insync.replicas,but what confused
> me is replicas in ISR is not strongly consistent,in kafka 0.9 we use 
> replica.lag.time.max.ms param to judge whether to shrink ISR, and the 
> defaults is 1 ms, so replicas’ data in isr can lag 1ms at most,
> we we restart broker which own this partitions’ leader, then controller will 
> start a new leader election, which will choose the first replica in ISR that 
> not equals to current leader as new leader, then this will loss data.
> 
> 
> The main produce handle code shows below:
> val numAcks = curInSyncReplicas.count(r => {
>  if (!r.isLocal)
>if (r.logEndOffset.messageOffset >= requiredOffset) {
>  trace("Replica %d of %s-%d received offset 
> %d".format(r.brokerId, topic, partitionId, requiredOffset))
>  true
>}
>else
>  false
>  else
>true /* also count the local (leader) replica */
>})
> 
>trace("%d acks satisfied for %s-%d with acks = -1".format(numAcks, 
> topic, partitionId))
> 
>val minIsr = leaderReplica.log.get.config.minInSyncReplicas
> 
>if (leaderReplica.highWatermark.messageOffset >= requiredOffset ) {
>  /*
>  * The topic may be configured not to accept messages if there are 
> not enough replicas in ISR
>  * in this scenario the request was already appended locally and then 
> added to the purgatory before the ISR was shrunk
>  */
>  if (minIsr <= curInSyncReplicas.size) {
>(true, ErrorMapping.NoError)
>  } else {
>(true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
>  }
>} else
>  (false, ErrorMapping.NoError)
> 
> 
> why only logging unAcks and not use numAcks to compare with minIsr, if 
> numAcks is 0, but curInSyncReplicas.size >= minIsr, then this will return, as 
> ISR shrink procedure is not real time, does this will loss data after leader 
> election?
> 
> Feedback is greatly appreciated. Thanks.
> meituan.inf
> 
> 
> 




[jira] [Commented] (KAFKA-2170) 10 LogTest cases failed for file.renameTo failed under windows

2016-09-22 Thread Soumyajit Sahu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15515023#comment-15515023
 ] 

Soumyajit Sahu commented on KAFKA-2170:
---

[~haraldk], I have updated/fixed the https://github.com/apache/kafka/pull/1757 
to delete the .deleted files. I had missed closing file channels while using 
the java FileChannel API, which had led to runaway open file handles.
Testing on a Windows server 2012 (and default log.segment.delete.delay.ms), I 
can see that the files get deleted successfully this time.

> 10 LogTest cases failed for  file.renameTo failed under windows
> ---
>
> Key: KAFKA-2170
> URL: https://issues.apache.org/jira/browse/KAFKA-2170
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.1.0
> Environment: Windows
>Reporter: Honghai Chen
>Assignee: Jay Kreps
>
> get latest code from trunk, then run test 
> gradlew  -i core:test --tests kafka.log.LogTest
> Got 10 cases failed for same reason:
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 0
>   at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:259)
>   at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:756)
>   at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:747)
>   at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:514)
>   at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:514)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at kafka.log.Log.deleteOldSegments(Log.scala:514)
>   at kafka.log.LogTest.testAsyncDelete(LogTest.scala:633)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:601)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:44)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:180)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:41)
>   at org.junit.runners.ParentRunner$1.evaluate(ParentRunner.java:173)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:31)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:220)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:86)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:49)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:69)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:48)
>   at sun.reflect.GeneratedMethodAccessor5.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:601)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>   at $Proxy2.processTestClass(Unknown Source)
>   at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:105)
>   at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at 

Build failed in Jenkins: kafka-trunk-jdk8 #908

2016-09-22 Thread Apache Jenkins Server
See 

Changes:

[jason] MINOR: Increase `zkConnectionTimeout` and timeout in

[ismael] MINOR: Add test cases for delays in consumer rebalance listener

--
[...truncated 3618 lines...]

kafka.log.LogTest > testReadAtLogGap STARTED

kafka.log.LogTest > testReadAtLogGap PASSED

kafka.log.LogTest > testTimeBasedLogRoll STARTED

kafka.log.LogTest > testTimeBasedLogRoll PASSED

kafka.log.LogTest > testLoadEmptyLog STARTED

kafka.log.LogTest > testLoadEmptyLog PASSED

kafka.log.LogTest > testMessageSetSizeCheck STARTED

kafka.log.LogTest > testMessageSetSizeCheck PASSED

kafka.log.LogTest > testIndexResizingAtTruncation STARTED

kafka.log.LogTest > testIndexResizingAtTruncation PASSED

kafka.log.LogTest > testCompactedTopicConstraints STARTED

kafka.log.LogTest > testCompactedTopicConstraints PASSED

kafka.log.LogTest > testThatGarbageCollectingSegmentsDoesntChangeOffset STARTED

kafka.log.LogTest > testThatGarbageCollectingSegmentsDoesntChangeOffset PASSED

kafka.log.LogTest > testAppendAndReadWithSequentialOffsets STARTED

kafka.log.LogTest > testAppendAndReadWithSequentialOffsets PASSED

kafka.log.LogTest > testDeleteOldSegmentsMethod STARTED

kafka.log.LogTest > testDeleteOldSegmentsMethod PASSED

kafka.log.LogTest > shouldDeleteSizeBasedSegments STARTED

kafka.log.LogTest > shouldDeleteSizeBasedSegments PASSED

kafka.log.LogTest > testParseTopicPartitionNameForNull STARTED

kafka.log.LogTest > testParseTopicPartitionNameForNull PASSED

kafka.log.LogTest > testAppendAndReadWithNonSequentialOffsets STARTED

kafka.log.LogTest > testAppendAndReadWithNonSequentialOffsets PASSED

kafka.log.LogTest > testParseTopicPartitionNameForMissingSeparator STARTED

kafka.log.LogTest > testParseTopicPartitionNameForMissingSeparator PASSED

kafka.log.LogTest > testCorruptIndexRebuild STARTED

kafka.log.LogTest > testCorruptIndexRebuild PASSED

kafka.log.LogTest > shouldDeleteTimeBasedSegmentsReadyToBeDeleted STARTED

kafka.log.LogTest > shouldDeleteTimeBasedSegmentsReadyToBeDeleted PASSED

kafka.log.LogTest > testReadWithTooSmallMaxLength STARTED

kafka.log.LogTest > testReadWithTooSmallMaxLength PASSED

kafka.log.LogTest > testBogusIndexSegmentsAreRemoved STARTED

kafka.log.LogTest > testBogusIndexSegmentsAreRemoved PASSED

kafka.log.LogTest > testCompressedMessages STARTED

kafka.log.LogTest > testCompressedMessages PASSED

kafka.log.LogTest > testAppendMessageWithNullPayload STARTED

kafka.log.LogTest > testAppendMessageWithNullPayload PASSED

kafka.log.LogTest > testCorruptLog STARTED

kafka.log.LogTest > testCorruptLog PASSED

kafka.log.LogTest > testLogRecoversToCorrectOffset STARTED

kafka.log.LogTest > testLogRecoversToCorrectOffset PASSED

kafka.log.LogTest > testReopenThenTruncate STARTED

kafka.log.LogTest > testReopenThenTruncate PASSED

kafka.log.LogTest > testParseTopicPartitionNameForMissingPartition STARTED

kafka.log.LogTest > testParseTopicPartitionNameForMissingPartition PASSED

kafka.log.LogTest > testParseTopicPartitionNameForEmptyName STARTED

kafka.log.LogTest > testParseTopicPartitionNameForEmptyName PASSED

kafka.log.LogTest > testOpenDeletesObsoleteFiles STARTED

kafka.log.LogTest > testOpenDeletesObsoleteFiles PASSED

kafka.log.LogTest > testRebuildTimeIndexForOldMessages STARTED

kafka.log.LogTest > testRebuildTimeIndexForOldMessages PASSED

kafka.log.LogTest > testSizeBasedLogRoll STARTED

kafka.log.LogTest > testSizeBasedLogRoll PASSED

kafka.log.LogTest > shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize 
STARTED

kafka.log.LogTest > shouldNotDeleteSizeBasedSegmentsWhenUnderRetentionSize 
PASSED

kafka.log.LogTest > testTimeBasedLogRollJitter STARTED

kafka.log.LogTest > testTimeBasedLogRollJitter PASSED

kafka.log.LogTest > testParseTopicPartitionName STARTED

kafka.log.LogTest > testParseTopicPartitionName PASSED

kafka.log.LogTest > testTruncateTo STARTED

kafka.log.LogTest > testTruncateTo PASSED

kafka.log.LogTest > testCleanShutdownFile STARTED

kafka.log.LogTest > testCleanShutdownFile PASSED

kafka.log.LogTest > testBuildTimeIndexWhenNotAssigningOffsets STARTED

kafka.log.LogTest > testBuildTimeIndexWhenNotAssigningOffsets PASSED

kafka.log.LogConfigTest > shouldValidateThrottledReplicasConfig STARTED

kafka.log.LogConfigTest > shouldValidateThrottledReplicasConfig PASSED

kafka.log.LogConfigTest > testFromPropsEmpty STARTED

kafka.log.LogConfigTest > testFromPropsEmpty PASSED

kafka.log.LogConfigTest > testKafkaConfigToProps STARTED

kafka.log.LogConfigTest > testKafkaConfigToProps PASSED

kafka.log.LogConfigTest > testFromPropsInvalid STARTED

kafka.log.LogConfigTest > testFromPropsInvalid PASSED

kafka.log.CleanerTest > testBuildOffsetMap STARTED

kafka.log.CleanerTest > testBuildOffsetMap PASSED

kafka.log.CleanerTest > testBuildOffsetMapFakeLarge STARTED

kafka.log.CleanerTest > testBuildOffsetMapFakeLarge PASSED

kafka.log.CleanerTest > testSegmentGrouping STARTED


Build failed in Jenkins: kafka-trunk-jdk8 #907

2016-09-22 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] HOTFIX: Decrease commit interval

--
[...truncated 11683 lines...]

org.apache.kafka.clients.MetadataTest > testListenerCanUnregister PASSED

org.apache.kafka.clients.MetadataTest > testTopicExpiry STARTED

org.apache.kafka.clients.MetadataTest > testTopicExpiry PASSED

org.apache.kafka.clients.MetadataTest > testFailedUpdate STARTED

org.apache.kafka.clients.MetadataTest > testFailedUpdate PASSED

org.apache.kafka.clients.MetadataTest > testMetadataUpdateWaitTime STARTED

org.apache.kafka.clients.MetadataTest > testMetadataUpdateWaitTime PASSED

org.apache.kafka.clients.MetadataTest > testUpdateWithNeedMetadataForAllTopics 
STARTED

org.apache.kafka.clients.MetadataTest > testUpdateWithNeedMetadataForAllTopics 
PASSED

org.apache.kafka.clients.MetadataTest > testClusterListenerGetsNotifiedOfUpdate 
STARTED

org.apache.kafka.clients.MetadataTest > testClusterListenerGetsNotifiedOfUpdate 
PASSED

org.apache.kafka.clients.MetadataTest > testMetadata STARTED

org.apache.kafka.clients.MetadataTest > testMetadata PASSED

org.apache.kafka.clients.MetadataTest > testListenerGetsNotifiedOfUpdate STARTED

org.apache.kafka.clients.MetadataTest > testListenerGetsNotifiedOfUpdate PASSED

org.apache.kafka.clients.MetadataTest > testNonExpiringMetadata STARTED

org.apache.kafka.clients.MetadataTest > testNonExpiringMetadata PASSED
:clients:determineCommitId UP-TO-DATE
:clients:createVersionFile
:clients:jar UP-TO-DATE
:core:compileJava UP-TO-DATE
:core:compileScalaJava HotSpot(TM) 64-Bit Server VM warning: ignoring option 
MaxPermSize=512m; support was removed in 8.0

:79:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.

org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP
 ^
:36:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.
 commitTimestamp: Long = 
org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP,

  ^
:37:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.
 expireTimestamp: Long = 
org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) {

  ^
:505:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.
  if (offsetAndMetadata.expireTimestamp == 
org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP)

^
:311:
 value DEFAULT_TIMESTAMP in object OffsetCommitRequest is deprecated: see 
corresponding Javadoc for more information.
  if (partitionData.timestamp == 
OffsetCommitRequest.DEFAULT_TIMESTAMP)
 ^
:93:
 class ProducerConfig in package producer is deprecated: This class has been 
deprecated and will be removed in a future release. Please use 
org.apache.kafka.clients.producer.ProducerConfig instead.
val producerConfig = new ProducerConfig(props)
 ^
:94:
 method fetchTopicMetadata in object ClientUtils is deprecated: This method has 
been deprecated and will be removed in a future release.
fetchTopicMetadata(topics, brokers, producerConfig, correlationId)
^
:393:
 constructor UpdateMetadataRequest in class UpdateMetadataRequest is 
deprecated: see corresponding Javadoc for more information.
new UpdateMetadataRequest(controllerId, controllerEpoch, 
liveBrokers.asJava, partitionStates.asJava)
^

[jira] [Commented] (KAFKA-4178) Replication Throttling: Consolidate Rate Classes

2016-09-22 Thread Jun Rao (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15514723#comment-15514723
 ] 

Jun Rao commented on KAFKA-4178:


[~benstopford], thanks for the patch. I like your idea of consolidating the 
Rate to use just FixedSubWindowPolicy, i.e., if the window size is less than a 
full window size, we adjust it to the size of just 1 full window (instead of # 
samples * full window size). This addresses (1) the NaN issue where the 
recording and the measuring are done within the same ms since the adjusted 
window size will never be 0; (2) the issue where the rate can be super high 
when the measured window before adjustment is very small (which can lead to too 
long of a delay for throttled clients); (3) the issue of spiking with 
replication throttling since the spiking can only happen on the first window 
instead of # samples * full window size.

[~jjkoshy], does that sound good to you too?

> Replication Throttling: Consolidate Rate Classes
> 
>
> Key: KAFKA-4178
> URL: https://issues.apache.org/jira/browse/KAFKA-4178
> Project: Kafka
>  Issue Type: Improvement
>  Components: replication
>Affects Versions: 0.10.1.0
>Reporter: Ben Stopford
>
> Replication throttling is using a different implementation of Rate to client 
> throttling (Rate & SimpleRate). These should be consolidated so both use the 
> same approach. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Jenkins build is back to normal : kafka-trunk-jdk7 #1563

2016-09-22 Thread Apache Jenkins Server
See 



[GitHub] kafka pull request #1866: MINOR: Add test cases for delays in consumer rebal...

2016-09-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1866


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka pull request #1889: MINOR: Increase `zkConnectionTimeout` and timeout ...

2016-09-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1889


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Build failed in Jenkins: kafka-trunk-jdk8 #906

2016-09-22 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] MINOR: Fix comments in KStreamKStreamJoinTest

--
[...truncated 5985 lines...]

kafka.consumer.PartitionAssignorTest > testRangePartitionAssignor PASSED

kafka.network.SocketServerTest > testClientDisconnectionUpdatesRequestMetrics 
STARTED

kafka.network.SocketServerTest > testClientDisconnectionUpdatesRequestMetrics 
PASSED

kafka.network.SocketServerTest > testMaxConnectionsPerIp STARTED

kafka.network.SocketServerTest > testMaxConnectionsPerIp PASSED

kafka.network.SocketServerTest > 
testBrokerSendAfterChannelClosedUpdatesRequestMetrics STARTED

kafka.network.SocketServerTest > 
testBrokerSendAfterChannelClosedUpdatesRequestMetrics PASSED

kafka.network.SocketServerTest > simpleRequest STARTED

kafka.network.SocketServerTest > simpleRequest PASSED

kafka.network.SocketServerTest > testSessionPrincipal STARTED

kafka.network.SocketServerTest > testSessionPrincipal PASSED

kafka.network.SocketServerTest > testMaxConnectionsPerIpOverrides STARTED

kafka.network.SocketServerTest > testMaxConnectionsPerIpOverrides PASSED

kafka.network.SocketServerTest > testSocketsCloseOnShutdown STARTED

kafka.network.SocketServerTest > testSocketsCloseOnShutdown PASSED

kafka.network.SocketServerTest > testSslSocketServer STARTED

kafka.network.SocketServerTest > testSslSocketServer PASSED

kafka.network.SocketServerTest > tooBigRequestIsRejected STARTED

kafka.network.SocketServerTest > tooBigRequestIsRejected PASSED

kafka.security.auth.PermissionTypeTest > testFromString STARTED

kafka.security.auth.PermissionTypeTest > testFromString PASSED

kafka.security.auth.AclTest > testAclJsonConversion STARTED

kafka.security.auth.AclTest > testAclJsonConversion PASSED

kafka.security.auth.ResourceTypeTest > testFromString STARTED

kafka.security.auth.ResourceTypeTest > testFromString PASSED

kafka.security.auth.OperationTest > testFromString STARTED

kafka.security.auth.OperationTest > testFromString PASSED

kafka.security.auth.ZkAuthorizationTest > testIsZkSecurityEnabled STARTED

kafka.security.auth.ZkAuthorizationTest > testIsZkSecurityEnabled PASSED

kafka.security.auth.ZkAuthorizationTest > testZkUtils STARTED

kafka.security.auth.ZkAuthorizationTest > testZkUtils PASSED

kafka.security.auth.ZkAuthorizationTest > testZkAntiMigration STARTED

kafka.security.auth.ZkAuthorizationTest > testZkAntiMigration PASSED

kafka.security.auth.ZkAuthorizationTest > testZkMigration STARTED

kafka.security.auth.ZkAuthorizationTest > testZkMigration PASSED

kafka.security.auth.ZkAuthorizationTest > testChroot STARTED

kafka.security.auth.ZkAuthorizationTest > testChroot PASSED

kafka.security.auth.ZkAuthorizationTest > testDelete STARTED

kafka.security.auth.ZkAuthorizationTest > testDelete PASSED

kafka.security.auth.ZkAuthorizationTest > testDeleteRecursive STARTED

kafka.security.auth.ZkAuthorizationTest > testDeleteRecursive PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testAllowAllAccess STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testAllowAllAccess PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testLocalConcurrentModificationOfResourceAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testLocalConcurrentModificationOfResourceAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testHighConcurrencyDeletionOfResourceAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testHighConcurrencyDeletionOfResourceAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testNoAclFound STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testNoAclFound PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testDistributedConcurrentModificationOfResourceAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testDistributedConcurrentModificationOfResourceAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testAclManagementAPIs STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testAclManagementAPIs PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testWildCardAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testWildCardAcls PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testTopicAcl STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testTopicAcl PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testSuperUserHasAccess STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testSuperUserHasAccess PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testDenyTakesPrecedence STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testDenyTakesPrecedence PASSED

kafka.security.auth.SimpleAclAuthorizerTest > testNoAclFoundOverride STARTED

kafka.security.auth.SimpleAclAuthorizerTest > testNoAclFoundOverride PASSED

kafka.security.auth.SimpleAclAuthorizerTest > 
testHighConcurrencyModificationOfResourceAcls STARTED

kafka.security.auth.SimpleAclAuthorizerTest > 
testHighConcurrencyModificationOfResourceAcls PASSED


Build failed in Jenkins: kafka-trunk-jdk7 #1562

2016-09-22 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-3782: Ensure heartbeat thread restarted after rebalance woken 
up

[wangguoz] MINOR: Fix comments in KStreamKStreamJoinTest

--
[...truncated 1737 lines...]
kafka.api.SslProducerSendTest > testCloseWithZeroTimeoutFromSenderThread STARTED

kafka.api.SslProducerSendTest > testCloseWithZeroTimeoutFromSenderThread PASSED

kafka.api.SaslPlainPlaintextConsumerTest > testCoordinatorFailover STARTED

kafka.api.SaslPlainPlaintextConsumerTest > testCoordinatorFailover PASSED

kafka.api.SaslPlainPlaintextConsumerTest > testSimpleConsumption STARTED

kafka.api.SaslPlainPlaintextConsumerTest > testSimpleConsumption PASSED

kafka.api.AuthorizerIntegrationTest > testCommitWithTopicWrite STARTED

kafka.api.AuthorizerIntegrationTest > testCommitWithTopicWrite PASSED

kafka.api.AuthorizerIntegrationTest > testConsumeWithNoTopicAccess STARTED

kafka.api.AuthorizerIntegrationTest > testConsumeWithNoTopicAccess PASSED

kafka.api.AuthorizerIntegrationTest > 
testCreatePermissionNeededToReadFromNonExistentTopic STARTED

kafka.api.AuthorizerIntegrationTest > 
testCreatePermissionNeededToReadFromNonExistentTopic PASSED

kafka.api.AuthorizerIntegrationTest > testDeleteWithWildCardAuth STARTED

kafka.api.AuthorizerIntegrationTest > testDeleteWithWildCardAuth PASSED

kafka.api.AuthorizerIntegrationTest > testOffsetFetchWithNoAccess STARTED

kafka.api.AuthorizerIntegrationTest > testOffsetFetchWithNoAccess PASSED

kafka.api.AuthorizerIntegrationTest > testListOfsetsWithTopicDescribe STARTED

kafka.api.AuthorizerIntegrationTest > testListOfsetsWithTopicDescribe PASSED

kafka.api.AuthorizerIntegrationTest > testProduceWithTopicRead STARTED

kafka.api.AuthorizerIntegrationTest > testProduceWithTopicRead PASSED

kafka.api.AuthorizerIntegrationTest > testListOffsetsWithNoTopicAccess STARTED

kafka.api.AuthorizerIntegrationTest > testListOffsetsWithNoTopicAccess PASSED

kafka.api.AuthorizerIntegrationTest > 
testCreatePermissionNeededForWritingToNonExistentTopic STARTED

kafka.api.AuthorizerIntegrationTest > 
testCreatePermissionNeededForWritingToNonExistentTopic PASSED

kafka.api.AuthorizerIntegrationTest > 
testSimpleConsumeWithOffsetLookupAndNoGroupAccess STARTED

kafka.api.AuthorizerIntegrationTest > 
testSimpleConsumeWithOffsetLookupAndNoGroupAccess PASSED

kafka.api.AuthorizerIntegrationTest > testConsumeWithTopicDescribe STARTED

kafka.api.AuthorizerIntegrationTest > testConsumeWithTopicDescribe PASSED

kafka.api.AuthorizerIntegrationTest > testOffsetFetchWithNoTopicAccess STARTED

kafka.api.AuthorizerIntegrationTest > testOffsetFetchWithNoTopicAccess PASSED

kafka.api.AuthorizerIntegrationTest > testCommitWithNoTopicAccess STARTED

kafka.api.AuthorizerIntegrationTest > testCommitWithNoTopicAccess PASSED

kafka.api.AuthorizerIntegrationTest > testProduceWithNoTopicAccess STARTED

kafka.api.AuthorizerIntegrationTest > testProduceWithNoTopicAccess PASSED

kafka.api.AuthorizerIntegrationTest > testProduceWithTopicWrite STARTED

kafka.api.AuthorizerIntegrationTest > testProduceWithTopicWrite PASSED

kafka.api.AuthorizerIntegrationTest > testAuthorization STARTED

kafka.api.AuthorizerIntegrationTest > testAuthorization PASSED

kafka.api.AuthorizerIntegrationTest > testUnauthorizedDeleteWithDescribe STARTED

kafka.api.AuthorizerIntegrationTest > testUnauthorizedDeleteWithDescribe PASSED

kafka.api.AuthorizerIntegrationTest > testConsumeWithTopicAndGroupRead STARTED

kafka.api.AuthorizerIntegrationTest > testConsumeWithTopicAndGroupRead PASSED

kafka.api.AuthorizerIntegrationTest > testUnauthorizedDeleteWithoutDescribe 
STARTED

kafka.api.AuthorizerIntegrationTest > testUnauthorizedDeleteWithoutDescribe 
PASSED

kafka.api.AuthorizerIntegrationTest > testConsumeWithTopicWrite STARTED

kafka.api.AuthorizerIntegrationTest > testConsumeWithTopicWrite PASSED

kafka.api.AuthorizerIntegrationTest > testOffsetFetchWithNoGroupAccess STARTED

kafka.api.AuthorizerIntegrationTest > testOffsetFetchWithNoGroupAccess PASSED

kafka.api.AuthorizerIntegrationTest > testCommitWithNoAccess STARTED

kafka.api.AuthorizerIntegrationTest > testCommitWithNoAccess PASSED

kafka.api.AuthorizerIntegrationTest > testCommitWithNoGroupAccess STARTED

kafka.api.AuthorizerIntegrationTest > testCommitWithNoGroupAccess PASSED

kafka.api.AuthorizerIntegrationTest > testConsumeWithNoAccess STARTED

kafka.api.AuthorizerIntegrationTest > testConsumeWithNoAccess PASSED

kafka.api.AuthorizerIntegrationTest > testOffsetFetchWithTopicAndGroupRead 
STARTED

kafka.api.AuthorizerIntegrationTest > testOffsetFetchWithTopicAndGroupRead 
PASSED

kafka.api.AuthorizerIntegrationTest > testCommitWithTopicDescribe STARTED

kafka.api.AuthorizerIntegrationTest > testCommitWithTopicDescribe PASSED

kafka.api.AuthorizerIntegrationTest > testProduceWithTopicDescribe STARTED

kafka.api.AuthorizerIntegrationTest > testProduceWithTopicDescribe PASSED


[jira] [Commented] (KAFKA-4202) Facing error while trying to create the Producer.

2016-09-22 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15514380#comment-15514380
 ] 

Ewen Cheslack-Postava commented on KAFKA-4202:
--

It looks like you may have mismatched versions of Kafka jars on your classpath 
since there is a reference to a method that doesn't exist. You should check 
what's on your classpath and verify there aren't multiple versions.

> Facing error while trying to create the Producer.
> -
>
> Key: KAFKA-4202
> URL: https://issues.apache.org/jira/browse/KAFKA-4202
> Project: Kafka
>  Issue Type: Bug
>Reporter: Rohan
>
> While trying to run the command 
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic first-topic
> I am facing the below error.
> ERROR StatusLogger No log4j2 configuration file found. Using default 
> configuration: logging only errors to the console.
> Exception in thread "main" java.lang.NoSuchMethodError: 
> kafka.utils.CommandLineUtils$.parseKeyValueArgs(Lscala/collection/Iterable;)Ljava/util/Properties;
>   at 
> kafka.tools.ConsoleProducer$ProducerConfig.(ConsoleProducer.scala:279)
>   at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:38)
>   at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1899: HOTFIX: Decrease commit interval

2016-09-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1899


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[DISCUSS] KIP-82 - Add Record Headers

2016-09-22 Thread Michael Pearce
Hi All,


I would like to discuss the following KIP proposal:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers



I have some initial ?drafts of roughly the changes that would be needed. This 
is no where finalized and look forward to the discussion especially as some 
bits I'm personally in two minds about.

https://github.com/michaelandrepearce/kafka/tree/kafka-headers-properties



Here is a link to a alternative option mentioned in the kip but one i would 
personally would discard (disadvantages mentioned in kip)

https://github.com/michaelandrepearce/kafka/tree/kafka-headers-full?


Thanks

Mike





The information contained in this email is strictly confidential and for the 
use of the addressee only, unless otherwise indicated. If you are not the 
intended recipient, please do not read, copy, use or disclose to others this 
message or any attachment. Please also notify the sender by replying to this 
email or by telephone (+44(020 7896 0011) and then delete the email and any 
copies of it. Opinions, conclusion (etc) that do not relate to the official 
business of this company shall be understood as neither given nor endorsed by 
it. IG is a trading name of IG Markets Limited (a company registered in England 
and Wales, company number 04008957) and IG Index Limited (a company registered 
in England and Wales, company number 01190902). Registered address at Cannon 
Bridge House, 25 Dowgate Hill, London EC4R 2YA. Both IG Markets Limited 
(register number 195355) and IG Index Limited (register number 114059) are 
authorised and regulated by the Financial Conduct Authority.


Re: ProducerRecord/Consumer MetaData/Headers

2016-09-22 Thread Michael Pearce
Thanks Ismael.

So i have made a start at a draft KIP please see this here.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers

I will start a DISCUSS thread shortly.


From: isma...@gmail.com  on behalf of Ismael Juma 

Sent: Thursday, September 22, 2016 9:42 AM
To: dev@kafka.apache.org
Subject: Re: ProducerRecord/Consumer MetaData/Headers

Sorry for the delay, you should have access now.

Ismael

On Thu, Sep 22, 2016 at 8:15 AM, Michael Pearce 
wrote:

> Hi again,
>
> Sorry to be nudging this, but it seems I'm still unable to create a page
> in the KIP Proposal area. Just don't want to be forgotten about between all
> the emails.
>
> Cheers
> Mike
> 
> From: Michael Pearce 
> Sent: Monday, September 19, 2016 6:19 PM
> To: dev@kafka.apache.org
> Subject: Re: ProducerRecord/Consumer MetaData/Headers
>
> Hi Again,
>
> I went to the wiki this afternoon to start writing it up, and seems I
> cannot create a page under the KIP area still. If someone could assist.
>
> Cheers
> Mike
>
>
> On 9/18/16, 7:07 PM, "Michael Pearce"  wrote:
>
> Hi Ismaelj
>
> Thanks, my wiki user is michael.andre.pearce.
>
> Re the link thanks again, actually indeed we started off trying to do
> this after we lost the ability to use the key to hold metadata once the
> compaction feature came, but it actually abusing the payload isn't imo a
> great solution, and has some issues that cannot be overcome and stopping us
> from using in some of our data / message flows. As such I think a solution
> in the broker/message/client needs to be made and formalised. Also then an
> ecosystems of tools could rely on such.
>
> I will add all details in KIP proposal, once I have access.
>
> Cheers
> Mike
>
>
> 
> From: isma...@gmail.com  on behalf of Ismael Juma <
> ism...@juma.me.uk>
> Sent: Sunday, September 18, 2016 9:01:22 AM
> To: dev@kafka.apache.org
> Subject: Re: ProducerRecord/Consumer MetaData/Headers
>
> Hi Mike,
>
> If you give me your wiki user name, I can give you the required
> permissions
> to post a KIP. This is definitely a big change and there is no clear
> consensus if changing the Kafka message format is the right way (it
> would
> be good not to pay the cost if you don't need it) or if it should be
> done
> via schemas, for example. Gwen shared some thoughts in the following
> message:
>
> http://search-hadoop.com/m/uyzND1OXS8EoGCU2
>
> Ismael
>
> On Sun, Sep 18, 2016 at 7:11 AM, Michael Pearce  >
> wrote:
>
> > Hi All, (again)
> >
> > If it helps the discussion, and almost ready patch implementing this
> is
> > available here:
> >
> > https://github.com/michaelandrepearce/kafka
> >
> >
> > The biggest/most core change is obviously the kafka.message.Message
> object.
> >
> >
> >
> > Some key bits in this implementation is the server side, and
> submodules
> > (connect, mirrormaker, streams) all updated to be aware of the new
> “headers”
> >
> >
> >
> > As a big API change have to use new feature on the client side, you
> use
> > the ConsumerRecord and ProducerRecord (which now extend the new
> Enhanced
> > versions) for K,V records without any code changes, to use the
> headers you
> > use the Enhanced versions HeadersConsumerRecord and
> HeadersProducerRecord.
> > This was needed to avoid causing code compilation failure just by
> > upgrading. If the patch were accepted I would imagine it as a way to
> > transition.
> >
> >
> >
> > I am guessing this needs a KIP rather than just myself raising a
> JIRA as
> > fairly substantial api change but unsure whom can raise these so
> assistance
> > in the process would be gratefully accepted..
> >
> >
> >
> > Cheers
> >
> > Mike
> >
> >
> >
> >
> >
> >
> > From: Michael Pearce 
> > Date: Saturday, September 17, 2016 at 6:40 AM
> > To: "dev@kafka.apache.org" 
> > Subject: ProducerRecord/Consumer MetaData/Headers
> >
> > Hi All,
> >
> > First of all apologies if this has been previously discussed I have
> just
> > joined the mail list (I cannot find a JIRA or KIP related nor via
> good old
> > google search)
> >
> > In our company we are looking to replace some of our more traditional
> > message flows with Kafka.
> >
> > One thing we have found lacking though compared with most messaging
> > systems is the ability to set header/metadata separate from our
> payload. We
> > did think about the key, but as this is used for compaction we

[jira] [Created] (KAFKA-4208) Add Record Headers

2016-09-22 Thread Michael Andre Pearce (IG) (JIRA)
Michael Andre Pearce (IG) created KAFKA-4208:


 Summary: Add Record Headers
 Key: KAFKA-4208
 URL: https://issues.apache.org/jira/browse/KAFKA-4208
 Project: Kafka
  Issue Type: New Feature
  Components: clients, core
Reporter: Michael Andre Pearce (IG)
Priority: Critical


Currently headers are not natively supported unlike many transport and 
messaging platforms or standard, this is to add support for headers to kafka

This JIRA is related to KIP found here:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Jenkins build is back to normal : kafka-0.10.1-jdk7 #4

2016-09-22 Thread Apache Jenkins Server
See 



Re: Unable to locate auto.create.topics.enable=true path for KafkaProducer

2016-09-22 Thread Thakrar, Jayesh
Umesh, 

That is a Kafka broker level config/parameter and not part of the producer.

Jayesh


On 9/22/16, 3:09 AM, "UMESH CHAUDHARY"  wrote:

Hi Mates,
I was trying to understand that if auto.create.topics.enable=true then how
KafkaProducer first creates the topic and sends messages to it.

What I saw:

private Future doSend(ProducerRecord record, Callback
callback)

method in KafkaProducer.java.

What I failed to get:

When getting metadata for topic using

ClusterAndWaitTime clusterAndWaitTime =
waitOnMetadata(record.topic(), this.maxBlockTimeMs);
line, I am failed to locate the path where we don't have any metadata for
the topic i.e. topic doesn't exist and according to
"auto.create.topics.enable=true" KafkaProducer invokes createTopic before
sending the record.

It works flawlessly with clients so it must be coded somewhere. I also saw
MockProducerTest.java but unable to locate.

I am a newbie so please forgive for any stupidity here.

Regards,
Umesh




Re: [DISCUSS] KIP-72 Allow Sizing Incoming Request Queue in Bytes

2016-09-22 Thread radai
As discussed in the KIP call, I have updated the kip-72 page (
https://cwiki.apache.org/confluence/display/KAFKA/KIP-72%3A+Allow+putting+a+bound+on+memory+consumed+by+Incoming+requests)
to record both configuration validations and implementation concerns.
I've also implemented channel muting/unmuting in response to memory
pressure. its available as a separate branch here -
https://github.com/radai-rosenblatt/kafka/tree/broker-memory-pool-with-muting
. the implementation without muting is here -
https://github.com/radai-rosenblatt/kafka/tree/broker-memory-pool.

the mute/unmute happens just before poll(), which means as a worst case
there will be no reads for 300ms if memory was unavailable (thats the
timeout untill the next poll). perhaps a design with dedicated read threads
could do better (such a thread could actually block waiting for memory),
but that would be a giant change.

On Tue, Sep 13, 2016 at 9:20 AM, radai  wrote:

> the specific memory pool implementation i wrote will allocate _any_ amount
> you request if it has _any_ memory available (so if it has 1 byte available
> and you ask for 1MB you will get 1MB and the counter will go negative).
> this was done to avoid issues with starvation of large requests. other
> implementations may be more strict. to me this means that generally its not
> a simple "have memory" vs "no memory" split (which gets worse under a
> hypothetical tiered pool scheme for QoS).
>
> to allow this flexibility in pool implementation i must preserve the
> amount of memory required. once read from the channel i cant put it back,
> so i store it?
>
> On Tue, Sep 13, 2016 at 5:30 AM, Rajini Sivaram <
> rajinisiva...@googlemail.com> wrote:
>
>> Is there any value in allowing the 4-byte size to be read even when the
>> request memory limit has been reached? If not, you can disable OP_READ
>> interest for all channels that are ready inside Selector.poll() when
>> memory
>> limit has been reached and re-enable before returning from poll(). Perhaps
>> a listener that is invoked when MemoryPool moves from unavailable to
>> available state can wakeup the selector. The changes for this should be
>> fairly contained without any additional channel state. And it would avoid
>> the overhead of polls that return immediately even when progress cannot be
>> made because memory limit has been reached.
>>
>> On Tue, Sep 13, 2016 at 12:31 AM, radai 
>> wrote:
>>
>> > Hi Jun,
>> >
>> > Yes, youre right - right now the next select() call will return
>> immediately
>> > with the same set of keys as earlier (at least) as they were not
>> previously
>> > handled (no memory).
>> > My assumption is that this happens under considerable load - something
>> has
>> > to be occupying all this memory. also, this happens in the context of
>> > SocketServer.Processor.run():
>> >
>> > while (isRunning) {
>> >configureNewConnections()
>> >processNewResponses()
>> >poll()   <-- HERE
>> >processCompletedReceives()
>> >processCompletedSends()
>> >processDisconnected()
>> > }
>> >
>> > even within poll(), things like finishConnection(), prepare(), and
>> write()s
>> > can still make progress under low memory conditions. and given the load,
>> > there's probably progress to be made in processCompletedReceives(),
>> > processCompletedSends() and processDisconnected().
>> >
>> > if there's progress to be made in other things its likely that the next
>> > call to poll() will not happen immediately and so the loop wont be that
>> > tight. in order for this to devolve into true busy waiting you would
>> need a
>> > situation where no progress can be made on any in-progress requests and
>> no
>> > responses to send out ?
>> >
>> > if my assumption does not hold then you are correct, and
>> selector.poll(300)
>> > currently hardcoded in SocketServer.Processor.poll() would need to be
>> > replaced with something more complicated. my biggest point of concern
>> > though is that the resulting code would be complicated and would couple
>> > Selector to the memory pool very tightly. undey my current patch
>> Selector
>> > needs the memory pool only to pass to channels when they are built. this
>> > would allow different memory pools relatively easily for things like
>> > reserving memory for cross-broker replication and high-SLA connections.
>> a
>> > tighter coupling would make any such future modification hard.
>> >
>> > On Sun, Sep 11, 2016 at 10:37 AM, Jun Rao  wrote:
>> >
>> > > Hi, Radai,
>> > >
>> > > Thanks for the reply. I still have a followup question on #2.
>> > >
>> > > My understanding is that in your proposal, selector will now first
>> read
>> > the
>> > > size of the Receive. If there is not enough memory, it has to turn off
>> > the
>> > > READ interest bit for the corresponding KafkaChannel. Otherwise,
>> > subsequent
>> > > selector.poll() call will always return immediately, adding
>> unnecessary
>> > > 

Build failed in Jenkins: kafka-trunk-jdk8 #905

2016-09-22 Thread Apache Jenkins Server
See 

Changes:

[wangguoz] KAFKA-3782: Ensure heartbeat thread restarted after rebalance woken 
up

--
[...truncated 1019 lines...]
kafka.utils.ReplicationUtilsTest > testGetLeaderIsrAndEpochForPartition STARTED

kafka.utils.ReplicationUtilsTest > testGetLeaderIsrAndEpochForPartition PASSED

kafka.utils.timer.TimerTest > testAlreadyExpiredTask STARTED

kafka.utils.timer.TimerTest > testAlreadyExpiredTask PASSED

kafka.utils.timer.TimerTest > testTaskExpiration STARTED

kafka.utils.timer.TimerTest > testTaskExpiration PASSED

kafka.utils.timer.TimerTaskListTest > testAll STARTED

kafka.utils.timer.TimerTaskListTest > testAll PASSED

kafka.utils.SchedulerTest > testMockSchedulerNonPeriodicTask STARTED

kafka.utils.SchedulerTest > testMockSchedulerNonPeriodicTask PASSED

kafka.utils.SchedulerTest > testMockSchedulerPeriodicTask STARTED

kafka.utils.SchedulerTest > testMockSchedulerPeriodicTask PASSED

kafka.utils.SchedulerTest > testNonPeriodicTask STARTED

kafka.utils.SchedulerTest > testNonPeriodicTask PASSED

kafka.utils.SchedulerTest > testRestart STARTED

kafka.utils.SchedulerTest > testRestart PASSED

kafka.utils.SchedulerTest > testReentrantTaskInMockScheduler STARTED

kafka.utils.SchedulerTest > testReentrantTaskInMockScheduler PASSED

kafka.utils.SchedulerTest > testPeriodicTask STARTED

kafka.utils.SchedulerTest > testPeriodicTask PASSED

kafka.utils.CommandLineUtilsTest > testParseEmptyArg STARTED

kafka.utils.CommandLineUtilsTest > testParseEmptyArg PASSED

kafka.utils.CommandLineUtilsTest > testParseSingleArg STARTED

kafka.utils.CommandLineUtilsTest > testParseSingleArg PASSED

kafka.utils.CommandLineUtilsTest > testParseArgs STARTED

kafka.utils.CommandLineUtilsTest > testParseArgs PASSED

kafka.utils.CommandLineUtilsTest > testParseEmptyArgAsValid STARTED

kafka.utils.CommandLineUtilsTest > testParseEmptyArgAsValid PASSED

kafka.utils.UtilsTest > testGenerateUuidAsBase64 STARTED

kafka.utils.UtilsTest > testGenerateUuidAsBase64 PASSED

kafka.utils.UtilsTest > testAbs STARTED

kafka.utils.UtilsTest > testAbs PASSED

kafka.utils.UtilsTest > testReplaceSuffix STARTED

kafka.utils.UtilsTest > testReplaceSuffix PASSED

kafka.utils.UtilsTest > testCircularIterator STARTED

kafka.utils.UtilsTest > testCircularIterator PASSED

kafka.utils.UtilsTest > testReadBytes STARTED

kafka.utils.UtilsTest > testReadBytes PASSED

kafka.utils.UtilsTest > testCsvList STARTED

kafka.utils.UtilsTest > testCsvList PASSED

kafka.utils.UtilsTest > testReadInt STARTED

kafka.utils.UtilsTest > testReadInt PASSED

kafka.utils.UtilsTest > testUrlSafeBase64EncodeUUID STARTED

kafka.utils.UtilsTest > testUrlSafeBase64EncodeUUID PASSED

kafka.utils.UtilsTest > testCsvMap STARTED

kafka.utils.UtilsTest > testCsvMap PASSED

kafka.utils.UtilsTest > testInLock STARTED

kafka.utils.UtilsTest > testInLock PASSED

kafka.utils.UtilsTest > testSwallow STARTED

kafka.utils.UtilsTest > testSwallow PASSED

kafka.utils.JsonTest > testJsonEncoding STARTED

kafka.utils.JsonTest > testJsonEncoding PASSED

kafka.utils.IteratorTemplateTest > testIterator STARTED

kafka.utils.IteratorTemplateTest > testIterator PASSED

kafka.metrics.KafkaTimerTest > testKafkaTimer STARTED

kafka.metrics.KafkaTimerTest > testKafkaTimer PASSED

kafka.metrics.MetricsTest > testMetricsReporterAfterDeletingTopic STARTED

kafka.metrics.MetricsTest > testMetricsReporterAfterDeletingTopic PASSED

kafka.metrics.MetricsTest > 
testBrokerTopicMetricsUnregisteredAfterDeletingTopic STARTED

kafka.metrics.MetricsTest > 
testBrokerTopicMetricsUnregisteredAfterDeletingTopic PASSED

kafka.metrics.MetricsTest > testClusterIdMetric STARTED

kafka.metrics.MetricsTest > testClusterIdMetric PASSED

kafka.metrics.MetricsTest > testMetricsLeak STARTED

kafka.metrics.MetricsTest > testMetricsLeak PASSED

kafka.consumer.TopicFilterTest > testWhitelists STARTED

kafka.consumer.TopicFilterTest > testWhitelists PASSED

kafka.consumer.TopicFilterTest > 
testWildcardTopicCountGetTopicCountMapEscapeJson STARTED

kafka.consumer.TopicFilterTest > 
testWildcardTopicCountGetTopicCountMapEscapeJson PASSED

kafka.consumer.TopicFilterTest > testBlacklists STARTED

kafka.consumer.TopicFilterTest > testBlacklists PASSED

kafka.consumer.PartitionAssignorTest > testRoundRobinPartitionAssignor STARTED

kafka.consumer.PartitionAssignorTest > testRoundRobinPartitionAssignor PASSED

kafka.consumer.PartitionAssignorTest > testRangePartitionAssignor STARTED

kafka.consumer.PartitionAssignorTest > testRangePartitionAssignor PASSED

kafka.consumer.ZookeeperConsumerConnectorTest > testBasic STARTED

kafka.consumer.ZookeeperConsumerConnectorTest > testBasic PASSED

kafka.consumer.ZookeeperConsumerConnectorTest > testCompressionSetConsumption 
STARTED

kafka.consumer.ZookeeperConsumerConnectorTest > testCompressionSetConsumption 
PASSED

kafka.consumer.ZookeeperConsumerConnectorTest > testLeaderSelectionForPartition 

[GitHub] kafka pull request #1885: Fix comments in KStreamKStreamJoinTest

2016-09-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1885


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (KAFKA-4207) Partitions stopped after a rapid restart of a broker

2016-09-22 Thread Dustin Cote (JIRA)
Dustin Cote created KAFKA-4207:
--

 Summary: Partitions stopped after a rapid restart of a broker
 Key: KAFKA-4207
 URL: https://issues.apache.org/jira/browse/KAFKA-4207
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 0.10.0.1, 0.9.0.1
Reporter: Dustin Cote


Environment:
4 Kafka brokers
10,000 topics with one partition each, replication factor 3
Partitions with 4KB data each
No data being produced or consumed

Scenario:
Initiate controlled shutdown on one broker
Interrupt controlled shutdown prior completion with a SIGKILL
Start a new broker with the same broker ID as broker that was just killed 
immediately

Symptoms:
After starting the new broker, the other three brokers in the cluster will see 
under replicated partitions forever for some partitions that are hosted on the 
broker that was killed and restarted

Cause:
Today, the controller sends a StopReplica command for each replica hosted on a 
broker that has initiated a controlled shutdown.  For a large number of 
replicas this can take awhile.  When the broker that is doing the controlled 
shutdown is killed, the StopReplica commands are queued up even though the 
request queue to the broker is cleared.  When the broker comes back online, the 
StopReplica commands that were queued, get sent to the broker that just started 
up.  

CC: [~junrao] since he's familiar with the scenario seen here




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4202) Facing error while trying to create the Producer.

2016-09-22 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-4202:
-
Assignee: (was: Ewen Cheslack-Postava)

> Facing error while trying to create the Producer.
> -
>
> Key: KAFKA-4202
> URL: https://issues.apache.org/jira/browse/KAFKA-4202
> Project: Kafka
>  Issue Type: Bug
>Reporter: Rohan
>
> While trying to run the command 
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic first-topic
> I am facing the below error.
> ERROR StatusLogger No log4j2 configuration file found. Using default 
> configuration: logging only errors to the console.
> Exception in thread "main" java.lang.NoSuchMethodError: 
> kafka.utils.CommandLineUtils$.parseKeyValueArgs(Lscala/collection/Iterable;)Ljava/util/Properties;
>   at 
> kafka.tools.ConsoleProducer$ProducerConfig.(ConsoleProducer.scala:279)
>   at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:38)
>   at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-4202) Facing error while trying to create the Producer.

2016-09-22 Thread Ewen Cheslack-Postava (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-4202:
-
Component/s: (was: KafkaConnect)

> Facing error while trying to create the Producer.
> -
>
> Key: KAFKA-4202
> URL: https://issues.apache.org/jira/browse/KAFKA-4202
> Project: Kafka
>  Issue Type: Bug
>Reporter: Rohan
>Assignee: Ewen Cheslack-Postava
>
> While trying to run the command 
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic first-topic
> I am facing the below error.
> ERROR StatusLogger No log4j2 configuration file found. Using default 
> configuration: logging only errors to the console.
> Exception in thread "main" java.lang.NoSuchMethodError: 
> kafka.utils.CommandLineUtils$.parseKeyValueArgs(Lscala/collection/Iterable;)Ljava/util/Properties;
>   at 
> kafka.tools.ConsoleProducer$ProducerConfig.(ConsoleProducer.scala:279)
>   at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:38)
>   at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3782) Transient failure with kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_bounce.clean=True

2016-09-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15513829#comment-15513829
 ] 

ASF GitHub Bot commented on KAFKA-3782:
---

Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1898


> Transient failure with 
> kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_bounce.clean=True
> -
>
> Key: KAFKA-3782
> URL: https://issues.apache.org/jira/browse/KAFKA-3782
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: Liquan Pei
>Assignee: Jason Gustafson
>Priority: Minor
> Fix For: 0.10.1.0, 0.10.2.0
>
>
> For commit 946ae60
> max() arg is an empty sequence
> Traceback (most recent call last):
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/tests/runner.py",
>  line 106, in run_all_tests
> data = self.run_single_test()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/tests/runner.py",
>  line 162, in run_single_test
> return self.current_test_context.function(self.current_test)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/mark/_mark.py",
>  line 331, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py",
>  line 321, in test_bounce
> sink_seqno_max = max(sink_seqnos)
> ValueError: max() arg is an empty sequence



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (KAFKA-3782) Transient failure with kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_bounce.clean=True

2016-09-22 Thread Guozhang Wang (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-3782.
--
   Resolution: Fixed
Fix Version/s: (was: 0.10.0.1)
   0.10.2.0

Issue resolved by pull request 1898
[https://github.com/apache/kafka/pull/1898]

> Transient failure with 
> kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest.test_bounce.clean=True
> -
>
> Key: KAFKA-3782
> URL: https://issues.apache.org/jira/browse/KAFKA-3782
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: Liquan Pei
>Assignee: Jason Gustafson
>Priority: Minor
> Fix For: 0.10.1.0, 0.10.2.0
>
>
> For commit 946ae60
> max() arg is an empty sequence
> Traceback (most recent call last):
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/tests/runner.py",
>  line 106, in run_all_tests
> data = self.run_single_test()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/tests/runner.py",
>  line 162, in run_single_test
> return self.current_test_context.function(self.current_test)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape-0.5.1-py2.7.egg/ducktape/mark/_mark.py",
>  line 331, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/tests/kafkatest/tests/connect/connect_distributed_test.py",
>  line 321, in test_bounce
> sink_seqno_max = max(sink_seqnos)
> ValueError: max() arg is an empty sequence



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1898: KAFKA-3782: Ensure heartbeat thread restarted afte...

2016-09-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/1898


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (KAFKA-4206) Improve handling of invalid credentials to mitigate DOS issue (especially on SSL listeners)

2016-09-22 Thread Edoardo Comar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar updated KAFKA-4206:
-
Description: 
The current handling of invalid credentials (ie wrong user/password) is to let 
the {{SaslException}} thrown from an implementation of 
{{javax.security.sasl.SaslServer.evaluateResponse()}}
bubble up the call stack until it gets caught in 
{{org.apache.kafka.common.network.Selector.pollSelectionKeys()}}
where the {{KafkaChannel}} gets closed - which will cause the client that made 
the request to be disconnected.

This will happen however after the server has used considerable resources, 
especially for the SSL handshake which appears to be computationally expensive 
in Java.

We have observed that if just a few clients keep repeating requests with the 
wrong credentials, it is quite easy to get all the network processing threads 
in the Kafka server busy doing SSL handshakes.

This makes a Kafka cluster to easily suffer from a Denial Of Service - also non 
intentional  - attack. 
It can be non intentional, i.e. also caused by friendly clients, for example 
because a Kafka Java client Producer supplied with the wrong credentials will 
not throw an exception on publishing, so it may keep attempting to connect 
without the caller realising.

An easy fix which we have implemented and will supply a PR for is to *delay* 
considerably closing the {{KafkaChannel}} in the {{Selector}}, but obviously 
without blocking the processing thread.

This has been tested to be very effective in reducing the cpu usage spikes 
caused by non malicious clients using invalid SASL PLAIN credentials over SSL.


  was:
The current handling of invalid credentials (ie wrong user/password) is to let 
the {{SaslException}} thrown from an implementation of 
{{javax.security.sasl.SaslServer.evaluateResponse()}}
bubble up the call stack until it gets caught in 
{{org.apache.kafka.common.network.Selector.pollSelectionKeys()}}
where the `KafkaChannel` gets closed - which will cause the client that made 
the request to be disconnected.

This will happen however after the server has used considerable resources, 
especially for the SSL handshake which appears to be computationally expensive 
in Java.

We have observed that if just a few clients keep repeating requests with the 
wrong credentials, it is quite easy to get all the network processing threads 
in the Kafka server busy doing SSL handshakes.

This makes a Kafka cluster to easily suffer from a Denial Of Service - also non 
intentional  - attack. 
It can be non intentional, i.e. also caused by friendly clients, for example 
because a Kafka Java client Producer supplied with the wrong credentials will 
not throw an exception on publishing, so it may keep attempting to connect 
without the caller realising.

An easy fix which we have implemented and will supply a PR for is to *delay* 
considerably closing the `KafkaChannel` in the `Selector`, but obviously 
without blocking the processing thread.

This has be tested to be very effective in reducing the cpu usage spikes caused 
by non malicious ssl clients using invalid credentials.



> Improve handling of invalid credentials to mitigate DOS issue (especially on 
> SSL listeners)
> ---
>
> Key: KAFKA-4206
> URL: https://issues.apache.org/jira/browse/KAFKA-4206
> Project: Kafka
>  Issue Type: Improvement
>  Components: network, security
>Affects Versions: 0.10.0.0, 0.10.0.1
>Reporter: Edoardo Comar
>Assignee: Edoardo Comar
>
> The current handling of invalid credentials (ie wrong user/password) is to 
> let the {{SaslException}} thrown from an implementation of 
> {{javax.security.sasl.SaslServer.evaluateResponse()}}
> bubble up the call stack until it gets caught in 
> {{org.apache.kafka.common.network.Selector.pollSelectionKeys()}}
> where the {{KafkaChannel}} gets closed - which will cause the client that 
> made the request to be disconnected.
> This will happen however after the server has used considerable resources, 
> especially for the SSL handshake which appears to be computationally 
> expensive in Java.
> We have observed that if just a few clients keep repeating requests with the 
> wrong credentials, it is quite easy to get all the network processing threads 
> in the Kafka server busy doing SSL handshakes.
> This makes a Kafka cluster to easily suffer from a Denial Of Service - also 
> non intentional  - attack. 
> It can be non intentional, i.e. also caused by friendly clients, for example 
> because a Kafka Java client Producer supplied with the wrong credentials will 
> not throw an exception on publishing, so it may keep attempting to connect 
> without the caller realising.
> An easy fix which we have implemented and will supply a PR 

[jira] [Updated] (KAFKA-4206) Improve handling of invalid credentials to mitigate DOS issue (especially on SSL listeners)

2016-09-22 Thread Edoardo Comar (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Edoardo Comar updated KAFKA-4206:
-
Description: 
The current handling of invalid credentials (ie wrong user/password) is to let 
the {{SaslException}} thrown from an implementation of 
{{javax.security.sasl.SaslServer.evaluateResponse()}}
bubble up the call stack until it gets caught in 
{{org.apache.kafka.common.network.Selector.pollSelectionKeys()}}
where the `KafkaChannel` gets closed - which will cause the client that made 
the request to be disconnected.

This will happen however after the server has used considerable resources, 
especially for the SSL handshake which appears to be computationally expensive 
in Java.

We have observed that if just a few clients keep repeating requests with the 
wrong credentials, it is quite easy to get all the network processing threads 
in the Kafka server busy doing SSL handshakes.

This makes a Kafka cluster to easily suffer from a Denial Of Service - also non 
intentional  - attack. 
It can be non intentional, i.e. also caused by friendly clients, for example 
because a Kafka Java client Producer supplied with the wrong credentials will 
not throw an exception on publishing, so it may keep attempting to connect 
without the caller realising.

An easy fix which we have implemented and will supply a PR for is to *delay* 
considerably closing the `KafkaChannel` in the `Selector`, but obviously 
without blocking the processing thread.

This has be tested to be very effective in reducing the cpu usage spikes caused 
by non malicious ssl clients using invalid credentials.


  was:
The current handling of invalid credentials (ie wrong user/password) is to let 
the `SaslException` thrown from an implementation of 
`javax.security.sasl.SaslServer.evaluateResponse()`
bubble up the call stack until it gets caught in 
`org.apache.kafka.common.network.Selector.pollSelectionKeys()`
where the `KafkaChannel` gets closed - which will cause the client that made 
the request to be disconnected.

This will happen however after the server has used considerable resources, 
especially for the SSL handshake which appears to be computationally expensive 
in Java.

We have observed that if just a few clients keep repeating requests with the 
wrong credentials, it is quite easy to get all the network processing threads 
in the Kafka server busy doing SSL handshakes.

This makes a Kafka cluster to easily suffer from a Denial Of Service - also non 
intentional  - attack. 
It can be non intentional, i.e. also caused by friendly clients, for example 
because a Kafka Java client Producer supplied with the wrong credentials will 
not throw an exception on publishing, so it may keep attempting to connect 
without the caller realising.

An easy fix which we have implemented and will supply a PR for is to *delay* 
considerably closing the `KafkaChannel` in the `Selector`, but obviously 
without blocking the processing thread.

This has be tested to be very effective in reducing the cpu usage spikes caused 
by non malicious ssl clients using invalid credentials.



> Improve handling of invalid credentials to mitigate DOS issue (especially on 
> SSL listeners)
> ---
>
> Key: KAFKA-4206
> URL: https://issues.apache.org/jira/browse/KAFKA-4206
> Project: Kafka
>  Issue Type: Improvement
>  Components: network, security
>Affects Versions: 0.10.0.0, 0.10.0.1
>Reporter: Edoardo Comar
>Assignee: Edoardo Comar
>
> The current handling of invalid credentials (ie wrong user/password) is to 
> let the {{SaslException}} thrown from an implementation of 
> {{javax.security.sasl.SaslServer.evaluateResponse()}}
> bubble up the call stack until it gets caught in 
> {{org.apache.kafka.common.network.Selector.pollSelectionKeys()}}
> where the `KafkaChannel` gets closed - which will cause the client that made 
> the request to be disconnected.
> This will happen however after the server has used considerable resources, 
> especially for the SSL handshake which appears to be computationally 
> expensive in Java.
> We have observed that if just a few clients keep repeating requests with the 
> wrong credentials, it is quite easy to get all the network processing threads 
> in the Kafka server busy doing SSL handshakes.
> This makes a Kafka cluster to easily suffer from a Denial Of Service - also 
> non intentional  - attack. 
> It can be non intentional, i.e. also caused by friendly clients, for example 
> because a Kafka Java client Producer supplied with the wrong credentials will 
> not throw an exception on publishing, so it may keep attempting to connect 
> without the caller realising.
> An easy fix which we have implemented and will supply a PR for is to *delay* 
> considerably 

[jira] [Created] (KAFKA-4206) Improve handling of invalid credentials to mitigate DOS issue (especially on SSL listeners)

2016-09-22 Thread Edoardo Comar (JIRA)
Edoardo Comar created KAFKA-4206:


 Summary: Improve handling of invalid credentials to mitigate DOS 
issue (especially on SSL listeners)
 Key: KAFKA-4206
 URL: https://issues.apache.org/jira/browse/KAFKA-4206
 Project: Kafka
  Issue Type: Improvement
  Components: network, security
Affects Versions: 0.10.0.1, 0.10.0.0
Reporter: Edoardo Comar
Assignee: Edoardo Comar


The current handling of invalid credentials (ie wrong user/password) is to let 
the `SaslException` thrown from an implementation of 
`javax.security.sasl.SaslServer.evaluateResponse()`
bubble up the call stack until it gets caught in 
`org.apache.kafka.common.network.Selector.pollSelectionKeys()`
where the `KafkaChannel` gets closed - which will cause the client that made 
the request to be disconnected.

This will happen however after the server has used considerable resources, 
especially for the SSL handshake which appears to be computationally expensive 
in Java.

We have observed that if just a few clients keep repeating requests with the 
wrong credentials, it is quite easy to get all the network processing threads 
in the Kafka server busy doing SSL handshakes.

This makes a Kafka cluster to easily suffer from a Denial Of Service - also non 
intentional  - attack. 
It can be non intentional, i.e. also caused by friendly clients, for example 
because a Kafka Java client Producer supplied with the wrong credentials will 
not throw an exception on publishing, so it may keep attempting to connect 
without the caller realising.

An easy fix which we have implemented and will supply a PR for is to *delay* 
considerably closing the `KafkaChannel` in the `Selector`, but obviously 
without blocking the processing thread.

This has be tested to be very effective in reducing the cpu usage spikes caused 
by non malicious ssl clients using invalid credentials.




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4205) NullPointerException in fetchOffsetsBefore

2016-09-22 Thread Andrew Grasso (JIRA)
Andrew Grasso created KAFKA-4205:


 Summary: NullPointerException in fetchOffsetsBefore
 Key: KAFKA-4205
 URL: https://issues.apache.org/jira/browse/KAFKA-4205
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.9.0.1
Reporter: Andrew Grasso


We recently observed the following error in brokers running 0.9.0.1:

A client saw an Unkown error code in response to an offset request for TOPICX, 
partition 0

The server logs look like:
{code}
[2016-09-21 21:26:07,143] INFO Scheduling log segment 527235760 for log 
TOPICX-0 for deletion. (kafka.log.Log)
[2016-09-21 21:26:07,144] ERROR [KafkaApi-13] Error while responding to offset 
request (kafka.server.KafkaApis)
java.lang.NullPointerException
at kafka.server.KafkaApis.fetchOffsetsBefore(KafkaApis.scala:513)
at kafka.server.KafkaApis.fetchOffsets(KafkaApis.scala:501)
at kafka.server.KafkaApis$$anonfun$18.apply(KafkaApis.scala:461)
at kafka.server.KafkaApis$$anonfun$18.apply(KafkaApis.scala:452)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at kafka.server.KafkaApis.handleOffsetRequest(KafkaApis.scala:452)
at kafka.server.KafkaApis.handle(KafkaApis.scala:70)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60)
at java.lang.Thread.run(Thread.java:745)
[2016-09-21 21:27:07,143] INFO Deleting segment 527235760 from log TOPICX-0. 
(kafka.log.Log)
[2016-09-21 21:27:07,263] INFO Deleting index 
/path/to/kafka/data/TOPICX-0/000527235760.index.deleted 
(kafka.log.OffsetIndex)
{code}
I suspect a race condition between {{Log.deleteSegment}} (which takes a lock on 
the log) and {{KafkaApis.fetchOffsetsBefore}}, which does not take any lock. In 
particular, line 513 in KafkaApis looks like:

{code:title=KafkaApis.scala|borderStyle=solid}
510  private def fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: 
Int): Seq[Long] = {
511val segsArray = log.logSegments.toArray
512var offsetTimeArray: Array[(Long, Long)] = null
513val lastSegmentHasSize = segsArray.last.size > 0;
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4114) Allow for different "auto.offset.reset" strategies for different input streams

2016-09-22 Thread Bill Bejeck (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4114?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15513498#comment-15513498
 ] 

Bill Bejeck commented on KAFKA-4114:


Hi Matthias,  

I've started working on this and wanted to confirm my understanding.  

The addtional parameter for auto-reset will need to be the same for all calls 
to {{KStreamBuilder.stream}} and {{KStreamBuilder.table}} for a given single 
instance of {{KStreamBuilder}}, but can differ for separate instances.

For example given two {{KStreamBuilder}} instances {{kb1}} and {{kb2}} you 
could do the following:

{{kb1.stream("foo", "earliest")}}
{{kb1.stream("bar", "earliest")}}

{{kb2.stream("baz", "latest")}}
{{kb2.table("bar", "latest")}}

but not this: 

{{kb1.stream("foo", "latest")}}
{{kb1.stream("bar", "earliest")}}
as this second case above would reguire multiple consumers in the underlying 
{{StreamThread}} of the {{KafkaStreams}} instance obtained from using {{kb1}} 
as a construction parameter

Is this correct?

> Allow for different "auto.offset.reset" strategies for different input streams
> --
>
> Key: KAFKA-4114
> URL: https://issues.apache.org/jira/browse/KAFKA-4114
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Bill Bejeck
>
> Today we only have one consumer config "offset.auto.reset" to control that 
> behavior, which means all streams are read either from "earliest" or "latest".
> However, it would be useful to improve this settings to allow users have 
> finer control over different input stream. For example, with two input 
> streams, one of them always reading from offset 0 upon (re)-starting, and the 
> other reading for log end offset.
> This JIRA requires to extend {{KStreamBuilder}} API for methods 
> {{.stream(...)}} and {{.table(...)}} to add a new parameter that indicate the 
> initial offset to be used.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] kafka pull request #1900: fix ambiguity in docs - consuming by multiple cons...

2016-09-22 Thread pilloPl
GitHub user pilloPl opened a pull request:

https://github.com/apache/kafka/pull/1900

fix ambiguity in docs - consuming by multiple consumers, but by exact…

In doc it stays

"Our topic is divided into a set of totally ordered partitions, each of 
which is consumed by one consumer at any given time."

And consumer is described as: 

"We'll call **processes** that subscribe to topics and process the feed of 
published messages **consumers**."

Which might lead to a wrong conclusion - that each partition can be read by 
one process at any given time.

I think this statements misses information about **consumer groups**, so i 
propose:

"Our topic is divided into a set of totally ordered partitions, each of 
which is consumed by exactly one consumer (from each subscribed consumer 
groups) at any given time"

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pilloPl/kafka minor/doc-fix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1900.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1900


commit 2280b5681313540c21c3f30a1c7afd37e6943b3a
Author: pilo 
Date:   2016-09-22T12:37:15Z

fix ambiguity in docs - consuming by multiple consumers, but by exactly  
one from each given consumer groups




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Does Kafka 0.9 can guarantee not loss data

2016-09-22 Thread Kafka
Hi all, 
in terms of topic, we create a topic with 6 partition,and each with 3 
replicas.
in terms of producer,when we send message with ack -1 using sync 
interface.
in terms of brokers,we set min.insync.replicas to 2.

after we review the kafka broker’s code,we know that we send a message to 
broker with ack -1, then we can get response if ISR of this partition is great 
than or equal to min.insync.replicas,but what confused
me is replicas in ISR is not strongly consistent,in kafka 0.9 we use 
replica.lag.time.max.ms param to judge whether to shrink ISR, and the defaults 
is 1 ms, so replicas’ data in isr can lag 1ms at most,
we we restart broker which own this partitions’ leader, then controller will 
start a new leader election, which will choose the first replica in ISR that 
not equals to current leader as new leader, then this will loss data.


The main produce handle code shows below:
val numAcks = curInSyncReplicas.count(r => {
  if (!r.isLocal)
if (r.logEndOffset.messageOffset >= requiredOffset) {
  trace("Replica %d of %s-%d received offset %d".format(r.brokerId, 
topic, partitionId, requiredOffset))
  true
}
else
  false
  else
true /* also count the local (leader) replica */
})

trace("%d acks satisfied for %s-%d with acks = -1".format(numAcks, 
topic, partitionId))

val minIsr = leaderReplica.log.get.config.minInSyncReplicas

if (leaderReplica.highWatermark.messageOffset >= requiredOffset ) {
  /*
  * The topic may be configured not to accept messages if there are not 
enough replicas in ISR
  * in this scenario the request was already appended locally and then 
added to the purgatory before the ISR was shrunk
  */
  if (minIsr <= curInSyncReplicas.size) {
(true, ErrorMapping.NoError)
  } else {
(true, ErrorMapping.NotEnoughReplicasAfterAppendCode)
  }
} else
  (false, ErrorMapping.NoError)


why only logging unAcks and not use numAcks to compare with minIsr, if numAcks 
is 0, but curInSyncReplicas.size >= minIsr, then this will return, as ISR 
shrink procedure is not real time, does this will loss data after leader 
election?

Feedback is greatly appreciated. Thanks.
meituan.inf





[GitHub] kafka pull request #1899: HOTFIX: Decrease commit interval

2016-09-22 Thread enothereska
GitHub user enothereska opened a pull request:

https://github.com/apache/kafka/pull/1899

HOTFIX: Decrease commit interval

The original commit interval of 30 seconds might be too large in some 
cases, e.g., when the verifier finishes before those 30 seconds have elapsed.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/enothereska/kafka 
hotfix-smoke-test-commit-interval

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/1899.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1899


commit 77eaf5e06c248e96beedfb48a10f14f9cb93be24
Author: Eno Thereska 
Date:   2016-09-22T09:01:11Z

Decrease commit interval




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [VOTE] KIP-54: Sticky Partition Assignment Strategy

2016-09-22 Thread Mickael Maison
+1 (non-binding)

On Thu, Sep 15, 2016 at 8:32 PM, Bill Bejeck  wrote:
> +1
>
> On Thu, Sep 15, 2016 at 5:16 AM, Rajini Sivaram <
> rajinisiva...@googlemail.com> wrote:
>
>> +1 (non-binding)
>>
>> On Wed, Sep 14, 2016 at 12:37 AM, Jason Gustafson 
>> wrote:
>>
>> > Thanks for the KIP. +1 from me.
>> >
>> > On Tue, Sep 13, 2016 at 12:05 PM, Vahid S Hashemian <
>> > vahidhashem...@us.ibm.com> wrote:
>> >
>> > > Hi all,
>> > >
>> > > Thanks for providing feedback on this KIP so far.
>> > > The KIP was discussed during the KIP meeting today and there doesn't
>> seem
>> > > to be any unaddressed issue at this point.
>> > >
>> > > So I would like to initiate the voting process.
>> > >
>> > > The KIP can be found here:
>> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > 54+-+Sticky+Partition+Assignment+Strategy
>> > > And the full discussion thread is here:
>> > > https://www.mail-archive.com/dev@kafka.apache.org/msg47607.html
>> > >
>> > > Thanks.
>> > > --Vahid
>> > >
>> > >
>> >
>>
>>
>>
>> --
>> Regards,
>>
>> Rajini
>>


Re: ProducerRecord/Consumer MetaData/Headers

2016-09-22 Thread Ismael Juma
Sorry for the delay, you should have access now.

Ismael

On Thu, Sep 22, 2016 at 8:15 AM, Michael Pearce 
wrote:

> Hi again,
>
> Sorry to be nudging this, but it seems I'm still unable to create a page
> in the KIP Proposal area. Just don't want to be forgotten about between all
> the emails.
>
> Cheers
> Mike
> 
> From: Michael Pearce 
> Sent: Monday, September 19, 2016 6:19 PM
> To: dev@kafka.apache.org
> Subject: Re: ProducerRecord/Consumer MetaData/Headers
>
> Hi Again,
>
> I went to the wiki this afternoon to start writing it up, and seems I
> cannot create a page under the KIP area still. If someone could assist.
>
> Cheers
> Mike
>
>
> On 9/18/16, 7:07 PM, "Michael Pearce"  wrote:
>
> Hi Ismaelj
>
> Thanks, my wiki user is michael.andre.pearce.
>
> Re the link thanks again, actually indeed we started off trying to do
> this after we lost the ability to use the key to hold metadata once the
> compaction feature came, but it actually abusing the payload isn't imo a
> great solution, and has some issues that cannot be overcome and stopping us
> from using in some of our data / message flows. As such I think a solution
> in the broker/message/client needs to be made and formalised. Also then an
> ecosystems of tools could rely on such.
>
> I will add all details in KIP proposal, once I have access.
>
> Cheers
> Mike
>
>
> 
> From: isma...@gmail.com  on behalf of Ismael Juma <
> ism...@juma.me.uk>
> Sent: Sunday, September 18, 2016 9:01:22 AM
> To: dev@kafka.apache.org
> Subject: Re: ProducerRecord/Consumer MetaData/Headers
>
> Hi Mike,
>
> If you give me your wiki user name, I can give you the required
> permissions
> to post a KIP. This is definitely a big change and there is no clear
> consensus if changing the Kafka message format is the right way (it
> would
> be good not to pay the cost if you don't need it) or if it should be
> done
> via schemas, for example. Gwen shared some thoughts in the following
> message:
>
> http://search-hadoop.com/m/uyzND1OXS8EoGCU2
>
> Ismael
>
> On Sun, Sep 18, 2016 at 7:11 AM, Michael Pearce  >
> wrote:
>
> > Hi All, (again)
> >
> > If it helps the discussion, and almost ready patch implementing this
> is
> > available here:
> >
> > https://github.com/michaelandrepearce/kafka
> >
> >
> > The biggest/most core change is obviously the kafka.message.Message
> object.
> >
> >
> >
> > Some key bits in this implementation is the server side, and
> submodules
> > (connect, mirrormaker, streams) all updated to be aware of the new
> “headers”
> >
> >
> >
> > As a big API change have to use new feature on the client side, you
> use
> > the ConsumerRecord and ProducerRecord (which now extend the new
> Enhanced
> > versions) for K,V records without any code changes, to use the
> headers you
> > use the Enhanced versions HeadersConsumerRecord and
> HeadersProducerRecord.
> > This was needed to avoid causing code compilation failure just by
> > upgrading. If the patch were accepted I would imagine it as a way to
> > transition.
> >
> >
> >
> > I am guessing this needs a KIP rather than just myself raising a
> JIRA as
> > fairly substantial api change but unsure whom can raise these so
> assistance
> > in the process would be gratefully accepted..
> >
> >
> >
> > Cheers
> >
> > Mike
> >
> >
> >
> >
> >
> >
> > From: Michael Pearce 
> > Date: Saturday, September 17, 2016 at 6:40 AM
> > To: "dev@kafka.apache.org" 
> > Subject: ProducerRecord/Consumer MetaData/Headers
> >
> > Hi All,
> >
> > First of all apologies if this has been previously discussed I have
> just
> > joined the mail list (I cannot find a JIRA or KIP related nor via
> good old
> > google search)
> >
> > In our company we are looking to replace some of our more traditional
> > message flows with Kafka.
> >
> > One thing we have found lacking though compared with most messaging
> > systems is the ability to set header/metadata separate from our
> payload. We
> > did think about the key, but as this is used for compaction we
> cannot have
> > changing values here which metadata/header values will obviously be.
> >
> > e.g. these headers/metadata are useful for audit data or platform
> data
> > that is not business payload related e.g. storing
> > the clientId that generated the message, correlation id of a
> > request/response, cluster id where the message was generate (case for
> > MirrorMakers), message uuid etc this list is endless.
>   

Unable to locate auto.create.topics.enable=true path for KafkaProducer

2016-09-22 Thread UMESH CHAUDHARY
Hi Mates,
I was trying to understand that if auto.create.topics.enable=true then how
KafkaProducer first creates the topic and sends messages to it.

What I saw:

private Future doSend(ProducerRecord record, Callback
callback)

method in KafkaProducer.java.

What I failed to get:

When getting metadata for topic using

ClusterAndWaitTime clusterAndWaitTime =
waitOnMetadata(record.topic(), this.maxBlockTimeMs);
line, I am failed to locate the path where we don't have any metadata for
the topic i.e. topic doesn't exist and according to
"auto.create.topics.enable=true" KafkaProducer invokes createTopic before
sending the record.

It works flawlessly with clients so it must be coded somewhere. I also saw
MockProducerTest.java but unable to locate.

I am a newbie so please forgive for any stupidity here.

Regards,
Umesh


[jira] [Commented] (KAFKA-4178) Replication Throttling: Consolidate Rate Classes

2016-09-22 Thread Ben Stopford (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15512485#comment-15512485
 ] 

Ben Stopford commented on KAFKA-4178:
-

Thanks Joel. Actually this comment, from that thread, makes some sense 
"Basically, we were having issues with very large metric values when the metric 
was very recently created."

My guess is, this difference in requirements comes from the fact that client 
quotas throttles by imposing a delay, so if you overestimate the metric, as is 
possible when using the Elapsed Window method, you could calculate a very long 
delay which might cause a client to time out. Replication throttling doesn't 
have this issue in the same way, as a overestimate will only affect replication 
for as long as the metric is actually overestimated. Which is never more than 
one or two sub windows in practice. But replication throttling does have an 
issues with the Fixed Window approach, as it consistently underestimates for 
the entire first window (i.e. ten sub-windows). 

So if we really want to merge the approaches, I actually implemented another 
type of rate (removed from this PR for simplicity) but I'll bring it up here. 
You can see it in this commit, it's called FixedSubWindowPolicy 
https://github.com/benstopford/kafka/blob/edb51d1d0df04b06a980940f9688a0ab06112784/clients/src/main/java/org/apache/kafka/common/metrics/stats/Window.java

This is essentially a simple hybrid of both approaches. If we really want to 
consolidate on one approach, this hybrid approach would be best I believe. I'll 
replicate it here as it's very simple:

{code:title=Window.java|borderStyle=solid}
/**
 * This policy fixes the first sub-window. If measurements do not span
 * more than one sub-window then the whole sub-window duration is used
 * to calculate the rate.
 *
 * However if there are measurements spanning multiple sub windows this rate
 * behaves identically to the elapsed window policy.
 *
 * So this provides a slow start, in a similar fashion to FixedWindows,
 * but only over the duration of the first sub-window rather than all
 * sub-windows.
 *
 * This policy policy provides a balance between the other two. It has a 
short
 * "slow start", in comparison to teh Fixed policy, after which it will have
 * the accuracy of the Elapsed policy.
 */
private static class FixedSubWindowPolicy implements Policy {
@Override
public long windowSize(long first, long last, MetricConfig config) {
long elapsed = last - first;
return elapsed < config.timeWindowMs() ? config.timeWindowMs() : 
elapsed;
}
} 
{code}

So this approach will only underestimate in the first sub-window (rather than 
all 10 in fixed, or just the first measurement in Elapsed) so, unless your 
subwindow size is small in relation to the measurement frequency, it should 
work well for Client throttling. 

Certainly it appears the best compromise to me. Alternatively we just stick 
with both approaches. I still think there is a reasonable argument for both. 

> Replication Throttling: Consolidate Rate Classes
> 
>
> Key: KAFKA-4178
> URL: https://issues.apache.org/jira/browse/KAFKA-4178
> Project: Kafka
>  Issue Type: Improvement
>  Components: replication
>Affects Versions: 0.10.1.0
>Reporter: Ben Stopford
>
> Replication throttling is using a different implementation of Rate to client 
> throttling (Rate & SimpleRate). These should be consolidated so both use the 
> same approach. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


Re: ProducerRecord/Consumer MetaData/Headers

2016-09-22 Thread Michael Pearce
Hi again,

Sorry to be nudging this, but it seems I'm still unable to create a page in the 
KIP Proposal area. Just don't want to be forgotten about between all the emails.

Cheers
Mike

From: Michael Pearce 
Sent: Monday, September 19, 2016 6:19 PM
To: dev@kafka.apache.org
Subject: Re: ProducerRecord/Consumer MetaData/Headers

Hi Again,

I went to the wiki this afternoon to start writing it up, and seems I cannot 
create a page under the KIP area still. If someone could assist.

Cheers
Mike


On 9/18/16, 7:07 PM, "Michael Pearce"  wrote:

Hi Ismaelj

Thanks, my wiki user is michael.andre.pearce.

Re the link thanks again, actually indeed we started off trying to do this 
after we lost the ability to use the key to hold metadata once the compaction 
feature came, but it actually abusing the payload isn't imo a great solution, 
and has some issues that cannot be overcome and stopping us from using in some 
of our data / message flows. As such I think a solution in the 
broker/message/client needs to be made and formalised. Also then an ecosystems 
of tools could rely on such.

I will add all details in KIP proposal, once I have access.

Cheers
Mike



From: isma...@gmail.com  on behalf of Ismael Juma 

Sent: Sunday, September 18, 2016 9:01:22 AM
To: dev@kafka.apache.org
Subject: Re: ProducerRecord/Consumer MetaData/Headers

Hi Mike,

If you give me your wiki user name, I can give you the required permissions
to post a KIP. This is definitely a big change and there is no clear
consensus if changing the Kafka message format is the right way (it would
be good not to pay the cost if you don't need it) or if it should be done
via schemas, for example. Gwen shared some thoughts in the following
message:

http://search-hadoop.com/m/uyzND1OXS8EoGCU2

Ismael

On Sun, Sep 18, 2016 at 7:11 AM, Michael Pearce 
wrote:

> Hi All, (again)
>
> If it helps the discussion, and almost ready patch implementing this is
> available here:
>
> https://github.com/michaelandrepearce/kafka
>
>
> The biggest/most core change is obviously the kafka.message.Message 
object.
>
>
>
> Some key bits in this implementation is the server side, and submodules
> (connect, mirrormaker, streams) all updated to be aware of the new 
“headers”
>
>
>
> As a big API change have to use new feature on the client side, you use
> the ConsumerRecord and ProducerRecord (which now extend the new Enhanced
> versions) for K,V records without any code changes, to use the headers you
> use the Enhanced versions HeadersConsumerRecord and HeadersProducerRecord.
> This was needed to avoid causing code compilation failure just by
> upgrading. If the patch were accepted I would imagine it as a way to
> transition.
>
>
>
> I am guessing this needs a KIP rather than just myself raising a JIRA as
> fairly substantial api change but unsure whom can raise these so 
assistance
> in the process would be gratefully accepted..
>
>
>
> Cheers
>
> Mike
>
>
>
>
>
>
> From: Michael Pearce 
> Date: Saturday, September 17, 2016 at 6:40 AM
> To: "dev@kafka.apache.org" 
> Subject: ProducerRecord/Consumer MetaData/Headers
>
> Hi All,
>
> First of all apologies if this has been previously discussed I have just
> joined the mail list (I cannot find a JIRA or KIP related nor via good old
> google search)
>
> In our company we are looking to replace some of our more traditional
> message flows with Kafka.
>
> One thing we have found lacking though compared with most messaging
> systems is the ability to set header/metadata separate from our payload. 
We
> did think about the key, but as this is used for compaction we cannot have
> changing values here which metadata/header values will obviously be.
>
> e.g. these headers/metadata are useful for audit data or platform data
> that is not business payload related e.g. storing
> the clientId that generated the message, correlation id of a
> request/response, cluster id where the message was generate (case for
> MirrorMakers), message uuid etc this list is endless.
>
> We would like to propose extending the Record from
> ProducerRecord/ConsumerRecord to 
ProducerRecord/ConsumerRecord
> where M is metadata/header again being like the key and value a simple
> byte[] so that it is completely upto the end users how to serialize /
> deserialize it.
>
> What our people’s thoughts?
> Any other ideas how to add headers/metadata.