Jenkins build is back to normal : kafka-trunk-jdk8 #1291

2017-02-21 Thread Apache Jenkins Server
See 




[jira] [Commented] (KAFKA-4762) Consumer throwing RecordTooLargeException even when messages are not that large

2017-02-21 Thread Huadong Liu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15877674#comment-15877674
 ] 

Huadong Liu commented on KAFKA-4762:


[~huxi_2b] Thanks for your fast response! 9925056036 is offending offset 
reported in the exception. If I understand correctly, all messages with 
position 191713371 were compressed into a single message, which may have 
exceeded max.partition.fetch.bytes.

[~neoeahit] Do you have the log dump after offset 9925056039?

> Consumer throwing RecordTooLargeException even when messages are not that 
> large
> ---
>
> Key: KAFKA-4762
> URL: https://issues.apache.org/jira/browse/KAFKA-4762
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: Vipul Singh
>
> We were just recently hit by a weird error. 
> Before going in any further, explaining of our service setup. we have a 
> producer which produces messages not larger than 256 kb of messages( we have 
> an explicit check about this on the producer side) and on the client side we 
> have a fetch limit of 512kb(max.partition.fetch.bytes is set to 524288 bytes) 
> Recently our client started to see this error:
> {quote}
> org.apache.kafka.common.errors.RecordTooLargeException: There are some 
> messages at [Partition=Offset]: {topic_name-0=9925056036} whose size is 
> larger than the fetch size 524288 and hence cannot be ever returned. Increase 
> the fetch size, or decrease the maximum message size the broker will allow.
> {quote}
> We tried consuming messages with another consumer, without any 
> max.partition.fetch.bytes limit, and it consumed fine. The messages were 
> small, and did not seem to be greater than 256 kb
> We took a log dump, and the log size looked fine.
> {quote}
> mpresscodec: NoCompressionCodec crc: 2473548911 keysize: 8
> offset: 9925056032 position: 191380053 isvalid: true payloadsize: 539 magic: 
> 0 compresscodec: NoCompressionCodec crc: 1656420267 keysize: 8
> offset: 9925056033 position: 191380053 isvalid: true payloadsize: 1551 magic: 
> 0 compresscodec: NoCompressionCodec crc: 2398479758 keysize: 8
> offset: 9925056034 position: 191380053 isvalid: true payloadsize: 1307 magic: 
> 0 compresscodec: NoCompressionCodec crc: 2845554215 keysize: 8
> offset: 9925056035 position: 191380053 isvalid: true payloadsize: 1520 magic: 
> 0 compresscodec: NoCompressionCodec crc: 3106984195 keysize: 8
> offset: 9925056036 position: 191713371 isvalid: true payloadsize: 1207 magic: 
> 0 compresscodec: NoCompressionCodec crc: 3462154435 keysize: 8
> offset: 9925056037 position: 191713371 isvalid: true payloadsize: 418 magic: 
> 0 compresscodec: NoCompressionCodec crc: 1536701802 keysize: 8
> offset: 9925056038 position: 191713371 isvalid: true payloadsize: 299 magic: 
> 0 compresscodec: NoCompressionCodec crc: 4112567543 keysize: 8
> offset: 9925056039 position: 191713371 isvalid: true payloadsize: 1571 magic: 
> 0 compresscodec: NoCompressionCodec crc: 3696994307 keysize: 8
> {quote}
> Has anyone seen something similar? or any points to troubleshoot this further
> Please Note: To overcome this issue, we deployed a new consumer, without this 
> limit of max.partition.fetch.bytes, and it worked fine.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (KAFKA-4762) Consumer throwing RecordTooLargeException even when messages are not that large

2017-02-21 Thread huxi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15877641#comment-15877641
 ] 

huxi edited comment on KAFKA-4762 at 2/22/17 7:16 AM:
--

[~huadongliu] The reason you saw `NoCompressionCodec` in the output is because 
of the effect of the config `deep-iteration` which enforces a deep iteration of 
the compressed messages. 

However, it's easy to find out that many messages share a same position which 
means the producer enables the compression if it's not explicitly set on broker 
side. 


was (Author: huxi_2b):
[~huadongliu] The reason you saw `NoCompressionCodec` in the output is because 
of the effect of the config `deep-iteration` which enforces a deep interation 
of the compressed messages. 

However, it's easy to find out that many messages share a same position which 
means the producer enables the compression if it's not explicitly set on broker 
side. 

> Consumer throwing RecordTooLargeException even when messages are not that 
> large
> ---
>
> Key: KAFKA-4762
> URL: https://issues.apache.org/jira/browse/KAFKA-4762
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: Vipul Singh
>
> We were just recently hit by a weird error. 
> Before going in any further, explaining of our service setup. we have a 
> producer which produces messages not larger than 256 kb of messages( we have 
> an explicit check about this on the producer side) and on the client side we 
> have a fetch limit of 512kb(max.partition.fetch.bytes is set to 524288 bytes) 
> Recently our client started to see this error:
> {quote}
> org.apache.kafka.common.errors.RecordTooLargeException: There are some 
> messages at [Partition=Offset]: {topic_name-0=9925056036} whose size is 
> larger than the fetch size 524288 and hence cannot be ever returned. Increase 
> the fetch size, or decrease the maximum message size the broker will allow.
> {quote}
> We tried consuming messages with another consumer, without any 
> max.partition.fetch.bytes limit, and it consumed fine. The messages were 
> small, and did not seem to be greater than 256 kb
> We took a log dump, and the log size looked fine.
> {quote}
> mpresscodec: NoCompressionCodec crc: 2473548911 keysize: 8
> offset: 9925056032 position: 191380053 isvalid: true payloadsize: 539 magic: 
> 0 compresscodec: NoCompressionCodec crc: 1656420267 keysize: 8
> offset: 9925056033 position: 191380053 isvalid: true payloadsize: 1551 magic: 
> 0 compresscodec: NoCompressionCodec crc: 2398479758 keysize: 8
> offset: 9925056034 position: 191380053 isvalid: true payloadsize: 1307 magic: 
> 0 compresscodec: NoCompressionCodec crc: 2845554215 keysize: 8
> offset: 9925056035 position: 191380053 isvalid: true payloadsize: 1520 magic: 
> 0 compresscodec: NoCompressionCodec crc: 3106984195 keysize: 8
> offset: 9925056036 position: 191713371 isvalid: true payloadsize: 1207 magic: 
> 0 compresscodec: NoCompressionCodec crc: 3462154435 keysize: 8
> offset: 9925056037 position: 191713371 isvalid: true payloadsize: 418 magic: 
> 0 compresscodec: NoCompressionCodec crc: 1536701802 keysize: 8
> offset: 9925056038 position: 191713371 isvalid: true payloadsize: 299 magic: 
> 0 compresscodec: NoCompressionCodec crc: 4112567543 keysize: 8
> offset: 9925056039 position: 191713371 isvalid: true payloadsize: 1571 magic: 
> 0 compresscodec: NoCompressionCodec crc: 3696994307 keysize: 8
> {quote}
> Has anyone seen something similar? or any points to troubleshoot this further
> Please Note: To overcome this issue, we deployed a new consumer, without this 
> limit of max.partition.fetch.bytes, and it worked fine.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4762) Consumer throwing RecordTooLargeException even when messages are not that large

2017-02-21 Thread huxi (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15877641#comment-15877641
 ] 

huxi commented on KAFKA-4762:
-

[~huadongliu] The reason you saw `NoCompressionCodec` in the output is because 
of the effect of the config `deep-iteration` which enforces a deep interation 
of the compressed messages. 

However, it's easy to find out that many messages share a same position which 
means the producer enables the compression if it's not explicitly set on broker 
side. 

> Consumer throwing RecordTooLargeException even when messages are not that 
> large
> ---
>
> Key: KAFKA-4762
> URL: https://issues.apache.org/jira/browse/KAFKA-4762
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: Vipul Singh
>
> We were just recently hit by a weird error. 
> Before going in any further, explaining of our service setup. we have a 
> producer which produces messages not larger than 256 kb of messages( we have 
> an explicit check about this on the producer side) and on the client side we 
> have a fetch limit of 512kb(max.partition.fetch.bytes is set to 524288 bytes) 
> Recently our client started to see this error:
> {quote}
> org.apache.kafka.common.errors.RecordTooLargeException: There are some 
> messages at [Partition=Offset]: {topic_name-0=9925056036} whose size is 
> larger than the fetch size 524288 and hence cannot be ever returned. Increase 
> the fetch size, or decrease the maximum message size the broker will allow.
> {quote}
> We tried consuming messages with another consumer, without any 
> max.partition.fetch.bytes limit, and it consumed fine. The messages were 
> small, and did not seem to be greater than 256 kb
> We took a log dump, and the log size looked fine.
> {quote}
> mpresscodec: NoCompressionCodec crc: 2473548911 keysize: 8
> offset: 9925056032 position: 191380053 isvalid: true payloadsize: 539 magic: 
> 0 compresscodec: NoCompressionCodec crc: 1656420267 keysize: 8
> offset: 9925056033 position: 191380053 isvalid: true payloadsize: 1551 magic: 
> 0 compresscodec: NoCompressionCodec crc: 2398479758 keysize: 8
> offset: 9925056034 position: 191380053 isvalid: true payloadsize: 1307 magic: 
> 0 compresscodec: NoCompressionCodec crc: 2845554215 keysize: 8
> offset: 9925056035 position: 191380053 isvalid: true payloadsize: 1520 magic: 
> 0 compresscodec: NoCompressionCodec crc: 3106984195 keysize: 8
> offset: 9925056036 position: 191713371 isvalid: true payloadsize: 1207 magic: 
> 0 compresscodec: NoCompressionCodec crc: 3462154435 keysize: 8
> offset: 9925056037 position: 191713371 isvalid: true payloadsize: 418 magic: 
> 0 compresscodec: NoCompressionCodec crc: 1536701802 keysize: 8
> offset: 9925056038 position: 191713371 isvalid: true payloadsize: 299 magic: 
> 0 compresscodec: NoCompressionCodec crc: 4112567543 keysize: 8
> offset: 9925056039 position: 191713371 isvalid: true payloadsize: 1571 magic: 
> 0 compresscodec: NoCompressionCodec crc: 3696994307 keysize: 8
> {quote}
> Has anyone seen something similar? or any points to troubleshoot this further
> Please Note: To overcome this issue, we deployed a new consumer, without this 
> limit of max.partition.fetch.bytes, and it worked fine.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-4762) Consumer throwing RecordTooLargeException even when messages are not that large

2017-02-21 Thread Huadong Liu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4762?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15877616#comment-15877616
 ] 

Huadong Liu commented on KAFKA-4762:


Hi [~huxi_2b], you are right. The RecordTooLargeException happened on our 
0.9.0.1 consumer. How did you tell that compression is enabled on the producer? 
Our python producer derives from the SimpleProducer. It does not enable 
compression 
https://github.com/dpkp/kafka-python/blob/8fde79dbb5a3793b1a9ebd10e032d5f3dd535645/kafka/producer/base.py#L281.
 The log dump [~neoeahit] posted shows "compresscodec: NoCompressionCodec".

What did you mean by "the whole compressed message is often much larger than a 
single one"? Were you talking about batching? Thanks for your insights on this.

> Consumer throwing RecordTooLargeException even when messages are not that 
> large
> ---
>
> Key: KAFKA-4762
> URL: https://issues.apache.org/jira/browse/KAFKA-4762
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: Vipul Singh
>
> We were just recently hit by a weird error. 
> Before going in any further, explaining of our service setup. we have a 
> producer which produces messages not larger than 256 kb of messages( we have 
> an explicit check about this on the producer side) and on the client side we 
> have a fetch limit of 512kb(max.partition.fetch.bytes is set to 524288 bytes) 
> Recently our client started to see this error:
> {quote}
> org.apache.kafka.common.errors.RecordTooLargeException: There are some 
> messages at [Partition=Offset]: {topic_name-0=9925056036} whose size is 
> larger than the fetch size 524288 and hence cannot be ever returned. Increase 
> the fetch size, or decrease the maximum message size the broker will allow.
> {quote}
> We tried consuming messages with another consumer, without any 
> max.partition.fetch.bytes limit, and it consumed fine. The messages were 
> small, and did not seem to be greater than 256 kb
> We took a log dump, and the log size looked fine.
> {quote}
> mpresscodec: NoCompressionCodec crc: 2473548911 keysize: 8
> offset: 9925056032 position: 191380053 isvalid: true payloadsize: 539 magic: 
> 0 compresscodec: NoCompressionCodec crc: 1656420267 keysize: 8
> offset: 9925056033 position: 191380053 isvalid: true payloadsize: 1551 magic: 
> 0 compresscodec: NoCompressionCodec crc: 2398479758 keysize: 8
> offset: 9925056034 position: 191380053 isvalid: true payloadsize: 1307 magic: 
> 0 compresscodec: NoCompressionCodec crc: 2845554215 keysize: 8
> offset: 9925056035 position: 191380053 isvalid: true payloadsize: 1520 magic: 
> 0 compresscodec: NoCompressionCodec crc: 3106984195 keysize: 8
> offset: 9925056036 position: 191713371 isvalid: true payloadsize: 1207 magic: 
> 0 compresscodec: NoCompressionCodec crc: 3462154435 keysize: 8
> offset: 9925056037 position: 191713371 isvalid: true payloadsize: 418 magic: 
> 0 compresscodec: NoCompressionCodec crc: 1536701802 keysize: 8
> offset: 9925056038 position: 191713371 isvalid: true payloadsize: 299 magic: 
> 0 compresscodec: NoCompressionCodec crc: 4112567543 keysize: 8
> offset: 9925056039 position: 191713371 isvalid: true payloadsize: 1571 magic: 
> 0 compresscodec: NoCompressionCodec crc: 3696994307 keysize: 8
> {quote}
> Has anyone seen something similar? or any points to troubleshoot this further
> Please Note: To overcome this issue, we deployed a new consumer, without this 
> limit of max.partition.fetch.bytes, and it worked fine.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Build failed in Jenkins: kafka-trunk-jdk7 #1957

2017-02-21 Thread Apache Jenkins Server
See 


Changes:

[becket.qin] KAFKA-4757; NetworkClient should log request details at trace 
level when

--
[...truncated 614.04 KB...]
org.apache.kafka.common.record.MemoryRecordsTest > 
testFilterToPreservesLogAppendTime[9] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testFilterTo[9] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testFilterTo[9] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[9] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[9] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testHasRoomForMethod[10] 
STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testHasRoomForMethod[10] 
PASSED

org.apache.kafka.common.record.MemoryRecordsTest > 
testFilterToPreservesLogAppendTime[10] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > 
testFilterToPreservesLogAppendTime[10] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testFilterTo[10] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testFilterTo[10] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[10] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[10] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testHasRoomForMethod[11] 
STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testHasRoomForMethod[11] 
PASSED

org.apache.kafka.common.record.MemoryRecordsTest > 
testFilterToPreservesLogAppendTime[11] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > 
testFilterToPreservesLogAppendTime[11] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testFilterTo[11] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testFilterTo[11] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[11] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[11] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testHasRoomForMethod[12] 
STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testHasRoomForMethod[12] 
PASSED

org.apache.kafka.common.record.MemoryRecordsTest > 
testFilterToPreservesLogAppendTime[12] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > 
testFilterToPreservesLogAppendTime[12] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testFilterTo[12] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testFilterTo[12] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[12] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[12] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testHasRoomForMethod[13] 
STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testHasRoomForMethod[13] 
PASSED

org.apache.kafka.common.record.MemoryRecordsTest > 
testFilterToPreservesLogAppendTime[13] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > 
testFilterToPreservesLogAppendTime[13] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testFilterTo[13] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testFilterTo[13] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[13] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[13] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testHasRoomForMethod[14] 
STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testHasRoomForMethod[14] 
PASSED

org.apache.kafka.common.record.MemoryRecordsTest > 
testFilterToPreservesLogAppendTime[14] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > 
testFilterToPreservesLogAppendTime[14] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testFilterTo[14] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testFilterTo[14] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[14] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[14] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testHasRoomForMethod[15] 
STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testHasRoomForMethod[15] 
PASSED

org.apache.kafka.common.record.MemoryRecordsTest > 
testFilterToPreservesLogAppendTime[15] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > 
testFilterToPreservesLogAppendTime[15] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testFilterTo[15] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testFilterTo[15] PASSED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[15] STARTED

org.apache.kafka.common.record.MemoryRecordsTest > testIterator[15] PASSED

org.apache.kafka.common.serialization.SerializationTest > testSerdeFromUnknown 
STARTED

org.apache.kafka.common.serialization.SerializationTest > testSerdeFromUnknown 
PASSED

org.apache.kafka.common.serialization.SerializationTest > testDoubleSerializer 
STARTED


[jira] [Commented] (KAFKA-4569) Transient failure in org.apache.kafka.clients.consumer.KafkaConsumerTest.testWakeupWithFetchDataAvailable

2017-02-21 Thread Armin Braun (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15877598#comment-15877598
 ] 

Armin Braun commented on KAFKA-4569:


Just for future reference. Tried to debug a few issues around this that I found 
and figured out that this is coming from the heartbeat thread.

Basically it's getting enabled without any delay in 
`org.apache.kafka.clients.consumer.internals.AbstractCoordinator#initiateJoinGroup`
 and then in some cases polls quickly enough to make 
`org.apache.kafka.clients.consumer.KafkaConsumer#pollOnce` break out here:

{code}
private Map>> pollOnce(long 
timeout) {
coordinator.poll(time.milliseconds());

// fetch positions if we have partitions we're subscribed to that we
// don't know the offset for
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());

// if data is available already, return it immediately
Map>> records = 
fetcher.fetchedRecords();
if (!records.isEmpty())
return records;
{code}

Not quite sure how to fix this, but you can reproduce it by running it in an 
endless loop (like 2ms per run and dies after 500 or so on average :)) -> red 
... goes green once you outcomment 
this enable section

{code}
if (heartbeatThread != null)
heartbeatThread.enable();
}
}
{code}

in 
`org.apache.kafka.clients.consumer.internals.AbstractCoordinator#initiateJoinGroup`

> Transient failure in 
> org.apache.kafka.clients.consumer.KafkaConsumerTest.testWakeupWithFetchDataAvailable
> -
>
> Key: KAFKA-4569
> URL: https://issues.apache.org/jira/browse/KAFKA-4569
> Project: Kafka
>  Issue Type: Sub-task
>  Components: unit tests
>Reporter: Guozhang Wang
>Assignee: Umesh Chaudhary
>  Labels: newbie
> Fix For: 0.10.3.0
>
>
> One example is:
> https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/370/testReport/junit/org.apache.kafka.clients.consumer/KafkaConsumerTest/testWakeupWithFetchDataAvailable/
> {code}
> Stacktrace
> java.lang.AssertionError
>   at org.junit.Assert.fail(Assert.java:86)
>   at org.junit.Assert.fail(Assert.java:95)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumerTest.testWakeupWithFetchDataAvailable(KafkaConsumerTest.java:679)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.rules.ExpectedException$ExpectedExceptionStatement.evaluate(ExpectedException.java:239)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:114)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:57)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:66)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>   at 
> 

[GitHub] kafka pull request #2582: MINOR: Fixed Non-Final Close Method + its Duplicat...

2017-02-21 Thread original-brownbear
GitHub user original-brownbear opened a pull request:

https://github.com/apache/kafka/pull/2582

MINOR: Fixed Non-Final Close Method + its Duplication

Trivial but in my opinion it's well worth it making the `close` method 
`final` here and also deleting that duplication (especially since it's hiding 
the fact that the `close` is synchronized here).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/original-brownbear/kafka 
cleanup-nonfinal-close

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/kafka/pull/2582.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #2582


commit 49e2e3a2ff8940f2ac8489ed39492085ab063d82
Author: Armin Braun 
Date:   2017-02-22T05:49:52Z

MINOR: Fixed nonfinal closed method + duplication




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (KAFKA-4757) Improve NetworkClient trace logging of request details

2017-02-21 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin resolved KAFKA-4757.
-
Resolution: Fixed

> Improve NetworkClient trace logging of request details
> --
>
> Key: KAFKA-4757
> URL: https://issues.apache.org/jira/browse/KAFKA-4757
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
>Reporter: Onur Karaman
>Assignee: Colin P. McCabe
> Fix For: 0.10.3.0
>
>
> Two issues here:
> 1. Here's what NetworkClient now shows when processing a disconnection:
> {code}
> [2017-02-10 10:48:57,052] TRACE Cancelled request 
> org.apache.kafka.clients.NetworkClient$InFlightRequest@52f759d7 due to node 0 
> being disconnected (org.apache.kafka.clients.NetworkClient)
> {code}
> The log at one point was useful and actually showed the contents of the 
> request. For instance, with FetchRequest, you used to be able to see which 
> partitions were requested as well as the offsets and max bytes requested per 
> partition.
> It looks like InFlightRequest itself doesn't actually hold the request but 
> instead currently just holds the RequestHeader. We probably want to make 
> InFlightRequest hold the entire request to make the original request show up 
> in the logs.
> 2. Sometimes I see the following log:
> {code}
> [2017-02-10 10:53:59,015] TRACE Sending {} to node 0. 
> (org.apache.kafka.clients.NetworkClient)
> {code}
> Again not very insightful.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4757) Improve NetworkClient trace logging of request details

2017-02-21 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-4757:

Affects Version/s: 0.10.2.0

> Improve NetworkClient trace logging of request details
> --
>
> Key: KAFKA-4757
> URL: https://issues.apache.org/jira/browse/KAFKA-4757
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
>Reporter: Onur Karaman
>Assignee: Colin P. McCabe
> Fix For: 0.10.3.0
>
>
> Two issues here:
> 1. Here's what NetworkClient now shows when processing a disconnection:
> {code}
> [2017-02-10 10:48:57,052] TRACE Cancelled request 
> org.apache.kafka.clients.NetworkClient$InFlightRequest@52f759d7 due to node 0 
> being disconnected (org.apache.kafka.clients.NetworkClient)
> {code}
> The log at one point was useful and actually showed the contents of the 
> request. For instance, with FetchRequest, you used to be able to see which 
> partitions were requested as well as the offsets and max bytes requested per 
> partition.
> It looks like InFlightRequest itself doesn't actually hold the request but 
> instead currently just holds the RequestHeader. We probably want to make 
> InFlightRequest hold the entire request to make the original request show up 
> in the logs.
> 2. Sometimes I see the following log:
> {code}
> [2017-02-10 10:53:59,015] TRACE Sending {} to node 0. 
> (org.apache.kafka.clients.NetworkClient)
> {code}
> Again not very insightful.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4757) Improve NetworkClient trace logging of request details

2017-02-21 Thread Jiangjie Qin (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jiangjie Qin updated KAFKA-4757:

Fix Version/s: 0.10.3.0

> Improve NetworkClient trace logging of request details
> --
>
> Key: KAFKA-4757
> URL: https://issues.apache.org/jira/browse/KAFKA-4757
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.0
>Reporter: Onur Karaman
>Assignee: Colin P. McCabe
> Fix For: 0.10.3.0
>
>
> Two issues here:
> 1. Here's what NetworkClient now shows when processing a disconnection:
> {code}
> [2017-02-10 10:48:57,052] TRACE Cancelled request 
> org.apache.kafka.clients.NetworkClient$InFlightRequest@52f759d7 due to node 0 
> being disconnected (org.apache.kafka.clients.NetworkClient)
> {code}
> The log at one point was useful and actually showed the contents of the 
> request. For instance, with FetchRequest, you used to be able to see which 
> partitions were requested as well as the offsets and max bytes requested per 
> partition.
> It looks like InFlightRequest itself doesn't actually hold the request but 
> instead currently just holds the RequestHeader. We probably want to make 
> InFlightRequest hold the entire request to make the original request show up 
> in the logs.
> 2. Sometimes I see the following log:
> {code}
> [2017-02-10 10:53:59,015] TRACE Sending {} to node 0. 
> (org.apache.kafka.clients.NetworkClient)
> {code}
> Again not very insightful.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[GitHub] kafka pull request #2563: Kafka 4757: NetworkClient should log request detai...

2017-02-21 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/kafka/pull/2563


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka-site pull request #45: Manual edits needed for 0.10.2 release

2017-02-21 Thread guozhangwang
Github user guozhangwang closed the pull request at:

https://github.com/apache/kafka-site/pull/45


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] kafka-site issue #45: Manual edits needed for 0.10.2 release

2017-02-21 Thread guozhangwang
Github user guozhangwang commented on the issue:

https://github.com/apache/kafka-site/pull/45
  
Merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


Re: [VOTE] KIP-111 Kafka should preserve the Principal generated by the PrincipalBuilder while processing the request received on socket channel, on the broker.

2017-02-21 Thread Mayuresh Gharat
Hi Jun,

Thanks for the comments.

I will mention in the KIP : how this change doesn't affect the default
authorizer implementation.

Regarding, Currently, we log the principal name in the request log in
RequestChannel, which has the format of "principalType + SEPARATOR + name;".
It would be good if we can keep the same convention after this KIP. One way
to do that is to convert java.security.Principal to KafkaPrincipal for
logging the requests.
--- > This would mean we have to create a new KafkaPrincipal on each
request. Would it be OK to just specify the name of the principal.
Is there any major reason, we don't want to change the logging format?

Thanks,

Mayuresh



On Mon, Feb 20, 2017 at 10:18 PM, Jun Rao  wrote:

> Hi, Mayuresh,
>
> Thanks for the updated KIP. A couple of more comments.
>
> 1. Do we convert java.security.Principal to KafkaPrincipal for
> authorization check in SimpleAclAuthorizer? If so, it would be useful to
> mention that in the wiki so that people can understand how this change
> doesn't affect the default authorizer implementation.
>
> 2. Currently, we log the principal name in the request log in
> RequestChannel, which has the format of "principalType + SEPARATOR +
> name;".
> It would be good if we can keep the same convention after this KIP. One way
> to do that is to convert java.security.Principal to KafkaPrincipal for
> logging the requests.
>
> Jun
>
>
> On Fri, Feb 17, 2017 at 5:35 PM, Mayuresh Gharat <
> gharatmayures...@gmail.com
> > wrote:
>
> > Hi Jun,
> >
> > I have updated the KIP. Would you mind taking another look?
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Fri, Feb 17, 2017 at 4:42 PM, Mayuresh Gharat <
> > gharatmayures...@gmail.com
> > > wrote:
> >
> > > Hi Jun,
> > >
> > > Sure sounds good to me.
> > >
> > > Thanks,
> > >
> > > Mayuresh
> > >
> > > On Fri, Feb 17, 2017 at 1:54 PM, Jun Rao  wrote:
> > >
> > >> Hi, Mani,
> > >>
> > >> Good point on using PrincipalBuilder for SASL. It seems that
> > >> PrincipalBuilder already has access to Authenticator. So, we could
> just
> > >> enable that in SaslChannelBuilder. We probably could do that in a
> > separate
> > >> KIP?
> > >>
> > >> Hi, Mayuresh,
> > >>
> > >> If you don't think there is a concrete use case for using
> > >> PrincipalBuilder in
> > >> kafka-acls.sh, perhaps we could do the simpler approach for now?
> > >>
> > >> Thanks,
> > >>
> > >> Jun
> > >>
> > >>
> > >>
> > >> On Fri, Feb 17, 2017 at 12:23 PM, Mayuresh Gharat <
> > >> gharatmayures...@gmail.com> wrote:
> > >>
> > >> > @Manikumar,
> > >> >
> > >> > Can you give an example how you are planning to use
> PrincipalBuilder?
> > >> >
> > >> > @Jun
> > >> > Yes, that is right. To give a brief overview, we just extract the
> cert
> > >> and
> > >> > hand it over to a third party library for creating a Principal. So
> we
> > >> > cannot create a Principal from just a string.
> > >> > The main motive behind adding the PrincipalBuilder for kafk-acls.sh
> > was
> > >> > that someone else (who can generate a Principal from map of
> propertie,
> > >> >  for example) can use it.
> > >> > As I said, Linkedin is fine with not making any changes to
> > Kafka-acls.sh
> > >> > for now. But we thought that it would be a good improvement to the
> > tool
> > >> and
> > >> > it makes it more flexible and usable.
> > >> >
> > >> > Let us know your thoughts, if you would like us to make
> kafka-acls.sh
> > >> more
> > >> > flexible and usable and not limited to Authorizer coming out of the
> > box.
> > >> >
> > >> > Thanks,
> > >> >
> > >> > Mayuresh
> > >> >
> > >> >
> > >> > On Thu, Feb 16, 2017 at 10:18 PM, Manikumar <
> > manikumar.re...@gmail.com>
> > >> > wrote:
> > >> >
> > >> > > Hi Jun,
> > >> > >
> > >> > > yes, we can just customize rules to send full principal name.  I
> was
> > >> > > just thinking to
> > >> > > use PrinciplaBuilder interface for implementing SASL rules also.
> So
> > >> that
> > >> > > the interface
> > >> > > will be consistent across protocols.
> > >> > >
> > >> > > Thanks
> > >> > >
> > >> > > On Fri, Feb 17, 2017 at 1:07 AM, Jun Rao 
> wrote:
> > >> > >
> > >> > > > Hi, Radai, Mayuresh,
> > >> > > >
> > >> > > > Thanks for the explanation. Good point on a pluggable authorizer
> > can
> > >> > > > customize how acls are added. However, earlier, Mayuresh was
> > saying
> > >> > that
> > >> > > in
> > >> > > > LinkedIn's customized authorizer, it's not possible to create a
> > >> > principal
> > >> > > > from string. If that's the case, will adding the principal
> builder
> > >> in
> > >> > > > kafka-acl.sh help? If the principal can be constructed from a
> > >> string,
> > >> > > > wouldn't it be simpler to just let kafka-acl.sh do authorization
> > >> based
> > >> > on
> > >> > > > that string name and not be aware of the principal builder? If
> you
> > >> > still
> > >> > > > think there is a need, perhaps you can add a more concrete use
> > case
> > >> > 

Re: [DISCUSS] KIP-126 - Allow KafkaProducer to batch based on uncompressed size

2017-02-21 Thread Mayuresh Gharat
Apurva has a point that can be documented for this config.

Overall, LGTM +1.

Thanks,

Mayuresh

On Tue, Feb 21, 2017 at 6:41 PM, Becket Qin  wrote:

> Hi Apurva,
>
> Yes, it is true that the request size might be much smaller if the batching
> is based on uncompressed size. I will let the users know about this. That
> said, in practice, this is probably fine. For example, at LinkedIn, our max
> message size is 1 MB, typically the compressed size would be 100 KB or
> larger, given that in most cases, there are many partitions, the request
> size would not be too small (typically around a few MB).
>
> At LinkedIn we do have some topics has various compression ratio. Those are
> usually topics shared by different services so the data may differ a lot
> although they are in the same topic and similar fields.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Tue, Feb 21, 2017 at 6:17 PM, Apurva Mehta  wrote:
>
> > Hi Becket, Thanks for the kip.
> >
> > I think one of the risks here is that when compression estimation is
> > disabled, you could have much smaller batches than expected, and
> throughput
> > could be hurt. It would be worth adding this to the documentation of this
> > setting.
> >
> > Also, one of the rejected alternatives states that per topic estimations
> > would not work when the compression of individual messages is variable.
> > This is true in theory, but in practice one would expect Kafka topics to
> > have fairly homogenous data, and hence should compress evenly. I was
> > curious if you have data which shows otherwise.
> >
> > Thanks,
> > Apurva
> >
> > On Tue, Feb 21, 2017 at 12:30 PM, Becket Qin 
> wrote:
> >
> > > Hi folks,
> > >
> > > I would like to start the discussion thread on KIP-126. The KIP propose
> > > adding a new configuration to KafkaProducer to allow batching based on
> > > uncompressed message size.
> > >
> > > Comments are welcome.
> > >
> > > The KIP wiki is following:
> > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > > 126+-+Allow+KafkaProducer+to+batch+based+on+uncompressed+size
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> >
>



-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: [DISCUSS] KIP-126 - Allow KafkaProducer to batch based on uncompressed size

2017-02-21 Thread Becket Qin
Hi Apurva,

Yes, it is true that the request size might be much smaller if the batching
is based on uncompressed size. I will let the users know about this. That
said, in practice, this is probably fine. For example, at LinkedIn, our max
message size is 1 MB, typically the compressed size would be 100 KB or
larger, given that in most cases, there are many partitions, the request
size would not be too small (typically around a few MB).

At LinkedIn we do have some topics has various compression ratio. Those are
usually topics shared by different services so the data may differ a lot
although they are in the same topic and similar fields.

Thanks,

Jiangjie (Becket) Qin


On Tue, Feb 21, 2017 at 6:17 PM, Apurva Mehta  wrote:

> Hi Becket, Thanks for the kip.
>
> I think one of the risks here is that when compression estimation is
> disabled, you could have much smaller batches than expected, and throughput
> could be hurt. It would be worth adding this to the documentation of this
> setting.
>
> Also, one of the rejected alternatives states that per topic estimations
> would not work when the compression of individual messages is variable.
> This is true in theory, but in practice one would expect Kafka topics to
> have fairly homogenous data, and hence should compress evenly. I was
> curious if you have data which shows otherwise.
>
> Thanks,
> Apurva
>
> On Tue, Feb 21, 2017 at 12:30 PM, Becket Qin  wrote:
>
> > Hi folks,
> >
> > I would like to start the discussion thread on KIP-126. The KIP propose
> > adding a new configuration to KafkaProducer to allow batching based on
> > uncompressed message size.
> >
> > Comments are welcome.
> >
> > The KIP wiki is following:
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> > 126+-+Allow+KafkaProducer+to+batch+based+on+uncompressed+size
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
>


Re: [DISCUSS] KIP-82 - Add Record Headers

2017-02-21 Thread Michael Pearce
Hi Jason,

Have converted the interface/api bullets into interface code snippets.

Agreed implementation won’t take too long. We have early versions already. 
Maybe a week before you think about merging I would assume it would be more 
stabilised? I was thinking then we could fork from your confluent branch, 
making and then holding KIP-82 changes in a patch file, that we can then 
re-fork from apache once KIP98 final is merged, and apply patch with last 
minute changes.

Cheers
Mike


On 22/02/2017, 00:56, "Jason Gustafson"  wrote:

Hey Michael,

Awesome. I have a minor request. The APIs are currently documented as a
wiki list. Would you mind adding a code snippet instead? It's a bit easier
to process.

How will be best to manage this, as we will obviously build off your KIP’s
> protocol changes, to avoid a merge hell, should we branch from your branch
> in the confluent repo or is it worth having a KIP-98 special branch in the
> apache git, that we can branch/fork from?


I was thinking about this also. Ideally we'd like to get the changes in as
close together as possible since we only want one magic bump and some users
deploy trunk. The level of effort to change the format for headers seems
not too high. Do you agree? My guess is that the KIP-98 message format
patch will take 2-3 weeks to review before we merge to trunk, so you could
hold off implementing until that patch has somewhat stabilized. That would
save some potential rebase pain.

-Jason


The information contained in this email is strictly confidential and for the 
use of the addressee only, unless otherwise indicated. If you are not the 
intended recipient, please do not read, copy, use or disclose to others this 
message or any attachment. Please also notify the sender by replying to this 
email or by telephone (+44(020 7896 0011) and then delete the email and any 
copies of it. Opinions, conclusion (etc) that do not relate to the official 
business of this company shall be understood as neither given nor endorsed by 
it. IG is a trading name of IG Markets Limited (a company registered in England 
and Wales, company number 04008957) and IG Index Limited (a company registered 
in England and Wales, company number 01190902). Registered address at Cannon 
Bridge House, 25 Dowgate Hill, London EC4R 2YA. Both IG Markets Limited 
(register number 195355) and IG Index Limited (register number 114059) are 
authorised and regulated by the Financial Conduct Authority.


Re: [VOTE] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-02-21 Thread Jun Rao
It seems that it's simpler and more consistent to avoid optional keys and
values. Not sure if it's worth squeezing every byte at the expense of
additional complexity. Other than that, +1 from me.

Also, since this is a large KIP, minor changes may arise as we start the
implementation. It would be good if we can keep the community posted of
those changes, if any.

Thanks,

Jun

On Tue, Feb 21, 2017 at 4:33 PM, Michael Pearce 
wrote:

> If the argument and objective within this KIP is to keep the overhead of
> the protocol as small as possible and remove redundancy, and every byte is
> being counted and the introduction of varInts, then it would make sense to
> use attributes to me.
>
>
> On 22/02/2017, 00:14, "Jason Gustafson"  wrote:
>
> Done. I've left the key and value as optional since we may not have
> reached
> consensus on whether to use attributes or not. Perhaps we should just
> keep
> it simple and not do it? The benefit seems small.
>
> -Jason
>
> On Tue, Feb 21, 2017 at 4:05 PM, Michael Pearce  >
> wrote:
>
> > Ok, no worries, can you add it back ValueLen on this KIP, and update
> the
> > doc, then we can work from that ☺
> >
> > Cheers
> > Mike
> >
> > On 22/02/2017, 00:02, "Jason Gustafson"  wrote:
> >
> > I feel it was a little odd to leave out the value length anyway,
> so I
> > would
> > rather add it back and put headers at the end. This is more
> consistent
> > with
> > the rest of the Kafka protocol.
> >
> > -Jason
> >
> > On Tue, Feb 21, 2017 at 3:58 PM, Michael Pearce <
> michael.pea...@ig.com
> > >
> > wrote:
> >
> > > Or we keep as is (valuelen removed), and headers are added with
> > headers
> > > length..
> > >
> > > On 21/02/2017, 23:38, "Apurva Mehta" 
> wrote:
> > >
> > > Right now, we don't need the value length: since it is the
> last
> > item
> > > in the
> > > message, and we have the message length, we can deduce the
> value
> > > length.
> > > However, if we are adding record headers to the end, we
> would
> > need to
> > > introduce the value length along with that change.
> > >
> > > On Tue, Feb 21, 2017 at 3:34 PM, Michael Pearce <
> > michael.pea...@ig.com
> > > >
> > > wrote:
> > >
> > > > It seems I cannot add comment on the doc.
> > > >
> > > > In the section around the message protocol.
> > > >
> > > > It has stated:
> > > >
> > > > Message =>
> > > > Length => uintVar
> > > > Attributes => int8
> > > > TimestampDelta => intVar
> > > > OffsetDelta => uintVar
> > > > KeyLen => uintVar [OPTIONAL]
> > > > Key => data [OPTIONAL]
> > > > Value => data [OPTIONAL]
> > > >
> > > > Should it not be: (added missing value len)
> > > >
> > > > Message =>
> > > > Length => uintVar
> > > > Attributes => int8
> > > > TimestampDelta => intVar
> > > > OffsetDelta => uintVar
> > > > KeyLen => uintVar [OPTIONAL]
> > > > Key => data [OPTIONAL]
> > > > ValueLen => uintVar [OPTIONAL]
> > > > Value => data [OPTIONAL]
> > > >
> > > >
> > > >
> > > > On 21/02/2017, 23:07, "Joel Koshy" 
> > wrote:
> > > >
> > > > I left a couple of comments/questions directly on the
> > google-doc
> > > >  > > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8>
> > > > - I found it much more tractable for a proposal of
> this
> > size to
> > > > discuss in
> > > > context within the doc. The permissions on the doc
> don't
> > let
> > > everyone
> > > > view
> > > > comments, so if there are any material changes that
> come
> > out of
> > > the
> > > > discussions in those comment threads we can
> summarize here.
> > > >
> > > > Thanks,
> > > >
> > > > Joel
> > > >
> > > > On Mon, Feb 20, 2017 at 4:08 PM, Becket Qin <
> > > becket@gmail.com>
> > > > wrote:
> > > >
> > > > > Thanks for the explanation, Guozhang. That makes
> sense.
> > > > >
> > > > > On Sun, Feb 19, 2017 at 7:28 PM, Guozhang Wang <
> > > wangg...@gmail.com>
> > > > wrote:
>

[jira] [Updated] (KAFKA-4144) Allow per stream/table timestamp extractor

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4144:
---
Description: 
At the moment the timestamp extractor is configured via a {{StreamConfig}} 
value to {{KafkaStreams}}. That means you can only have a single timestamp 
extractor per app, even though you may be joining multiple streams/tables that 
require different timestamp extraction methods.

You should be able to specify a timestamp extractor via 
{{KStreamBuilder.stream()/table()}}, just like you can specify key and value 
serdes that override the StreamConfig defaults.

Specifying a per-stream extractor should only be possible for sources, but not 
for intermediate topics. For PAPI we cannot enforce this, but for DSL 
{{through()}} should not allow to set a custom extractor by the user. In 
contrast, with regard to KAFKA-4785, is must internally set an extractor that 
returns the record's metadata timestamp in order to overwrite the global 
extractor from {{StreamsConfig}} (ie, set {{FailOnInvalidTimestampExtractor}}). 
This change should be done in KAFKA-4785 though.

  was:
At the moment the timestamp extractor is configured via a {{StreamConfig}} 
value to {{KafkaStreams}}. That means you can only have a single timestamp 
extractor per app, even though you may be joining multiple streams/tables that 
require different timestamp extraction methods.

You should be able to specify a timestamp extractor via 
{{KStreamBuilder.stream()/table()}}, just like you can specify key and value 
serdes that override the StreamConfig defaults.

Specifying a per-stream extractor should only be possible for sources, but not 
for intermediate topics. For PAPI we cannot enforce this, but for DSL 
{{through()}} should not allow to set a custom extractor by the user. In 
contrast, with regard to KAFKA-4785, is must internally set an extractor that 
returns the records metadata timestamp in order to overwrite the global 
extractor from {{StreamsConfig}} (ie, set {{FailOnInvalidTimestampExtractor}}). 
This change should be done in KAFKA-4785 though.


> Allow per stream/table timestamp extractor
> --
>
> Key: KAFKA-4144
> URL: https://issues.apache.org/jira/browse/KAFKA-4144
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Jeyhun Karimov
>  Labels: api, needs-kip
>
> At the moment the timestamp extractor is configured via a {{StreamConfig}} 
> value to {{KafkaStreams}}. That means you can only have a single timestamp 
> extractor per app, even though you may be joining multiple streams/tables 
> that require different timestamp extraction methods.
> You should be able to specify a timestamp extractor via 
> {{KStreamBuilder.stream()/table()}}, just like you can specify key and value 
> serdes that override the StreamConfig defaults.
> Specifying a per-stream extractor should only be possible for sources, but 
> not for intermediate topics. For PAPI we cannot enforce this, but for DSL 
> {{through()}} should not allow to set a custom extractor by the user. In 
> contrast, with regard to KAFKA-4785, is must internally set an extractor that 
> returns the record's metadata timestamp in order to overwrite the global 
> extractor from {{StreamsConfig}} (ie, set 
> {{FailOnInvalidTimestampExtractor}}). This change should be done in 
> KAFKA-4785 though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4785) Records from internal repartitioning topics should always use RecordMetadataTimestampExtractor

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4785:
---
Description: 
Users can specify what timestamp extractor should be used to decode the 
timestamp of input topic records. As long as RecordMetadataTimestamp or 
WallclockTime is use this is fine. 

However, for custom timestamp extractors it might be invalid to apply this 
custom extractor to records received from internal repartitioning topics. The 
reason is that Streams sets the current "stream time" as record metadata 
timestamp explicitly before writing to intermediate repartitioning topics 
because this timestamp should be use by downstream subtopologies. A custom 
timestamp extractor might return something different breaking this assumption.

Thus, for reading data from intermediate repartitioning topic, the configured 
timestamp extractor should not be used, but the record's metadata timestamp 
should be extracted as record timestamp.

In order to leverage the same behavior for intermediate user topic (ie, used in 
{{through()}})  we can leverage KAFKA-4144 and internally set an extractor for 
those "intermediate sources" that returns the record's metadata timestamp in 
order to overwrite the global extractor from {{StreamsConfig}} (ie, set 
{{FailOnInvalidTimestampExtractor}}).

  was:
Users can specify what timestamp extractor should be used to decode the 
timestamp of input topic records. As long as RecordMetadataTimestamp or 
WallclockTime is use this is fine. 

However, for custom timestamp extractors it might be invalid to apply this 
custom extractor to records received from internal repartitioning topics. The 
reason is that Streams sets the current "stream time" as record metadata 
timestamp explicitly before writing to intermediate repartitioning topics 
because this timestamp should be use by downstream subtopologies. A custom 
timestamp extractor might return something different breaking this assumption.

Thus, for reading data from intermediate repartitioning topic, the configured 
timestamp extractor should not be used, but the record's metadata timestamp 
should be extracted as record timestamp.


> Records from internal repartitioning topics should always use 
> RecordMetadataTimestampExtractor
> --
>
> Key: KAFKA-4785
> URL: https://issues.apache.org/jira/browse/KAFKA-4785
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Matthias J. Sax
>
> Users can specify what timestamp extractor should be used to decode the 
> timestamp of input topic records. As long as RecordMetadataTimestamp or 
> WallclockTime is use this is fine. 
> However, for custom timestamp extractors it might be invalid to apply this 
> custom extractor to records received from internal repartitioning topics. The 
> reason is that Streams sets the current "stream time" as record metadata 
> timestamp explicitly before writing to intermediate repartitioning topics 
> because this timestamp should be use by downstream subtopologies. A custom 
> timestamp extractor might return something different breaking this assumption.
> Thus, for reading data from intermediate repartitioning topic, the configured 
> timestamp extractor should not be used, but the record's metadata timestamp 
> should be extracted as record timestamp.
> In order to leverage the same behavior for intermediate user topic (ie, used 
> in {{through()}})  we can leverage KAFKA-4144 and internally set an extractor 
> for those "intermediate sources" that returns the record's metadata timestamp 
> in order to overwrite the global extractor from {{StreamsConfig}} (ie, set 
> {{FailOnInvalidTimestampExtractor}}).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4144) Allow per stream/table timestamp extractor

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4144:
---
Description: 
At the moment the timestamp extractor is configured via a {{StreamConfig}} 
value to {{KafkaStreams}}. That means you can only have a single timestamp 
extractor per app, even though you may be joining multiple streams/tables that 
require different timestamp extraction methods.

You should be able to specify a timestamp extractor via 
{{KStreamBuilder.stream()/table()}}, just like you can specify key and value 
serdes that override the StreamConfig defaults.

Specifying a per-stream extractor should only be possible for sources, but not 
for intermediate topics. For PAPI we cannot enforce this, but for DSL 
{{through()}} should not allow to set a custom extractor by the user. In 
contrast, with regard to KAFKA-4785, is must internally set an extractor that 
returns the records metadata timestamp in order to overwrite the global 
extractor from {{StreamsConfig}} (ie, set {{FailOnInvalidTimestampExtractor}}). 
This change should be done in KAFKA-4785 though.

  was:
At the moment the timestamp extractor is configured via a {{StreamConfig}} 
value to {{KafkaStreams}}. That means you can only have a single timestamp 
extractor per app, even though you may be joining multiple streams/tables that 
require different timestamp extraction methods.

You should be able to specify a timestamp extractor via 
{{KStreamBuilder.stream()/table()}}, just like you can specify key and value 
serdes that override the StreamConfig defaults.

Specifying a per-stream extractor should only be possible for sources, but not 
for intermediate topics. For PAPI we cannot enforce this, but for DSL 
{{through()}} should not allow to set a custom extractor by the user. In 
contrast, with regard to KAFKA-4785, is must internally set an extractor that 
returns the records metadata timestamp in order to overwrite the global 
extractor from {{StreamsConfig}} (ie, set {{FailOnInvalidTimestampExtractor}})


> Allow per stream/table timestamp extractor
> --
>
> Key: KAFKA-4144
> URL: https://issues.apache.org/jira/browse/KAFKA-4144
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Jeyhun Karimov
>  Labels: api, needs-kip
>
> At the moment the timestamp extractor is configured via a {{StreamConfig}} 
> value to {{KafkaStreams}}. That means you can only have a single timestamp 
> extractor per app, even though you may be joining multiple streams/tables 
> that require different timestamp extraction methods.
> You should be able to specify a timestamp extractor via 
> {{KStreamBuilder.stream()/table()}}, just like you can specify key and value 
> serdes that override the StreamConfig defaults.
> Specifying a per-stream extractor should only be possible for sources, but 
> not for intermediate topics. For PAPI we cannot enforce this, but for DSL 
> {{through()}} should not allow to set a custom extractor by the user. In 
> contrast, with regard to KAFKA-4785, is must internally set an extractor that 
> returns the records metadata timestamp in order to overwrite the global 
> extractor from {{StreamsConfig}} (ie, set 
> {{FailOnInvalidTimestampExtractor}}). This change should be done in 
> KAFKA-4785 though.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-126 - Allow KafkaProducer to batch based on uncompressed size

2017-02-21 Thread Apurva Mehta
Hi Becket, Thanks for the kip.

I think one of the risks here is that when compression estimation is
disabled, you could have much smaller batches than expected, and throughput
could be hurt. It would be worth adding this to the documentation of this
setting.

Also, one of the rejected alternatives states that per topic estimations
would not work when the compression of individual messages is variable.
This is true in theory, but in practice one would expect Kafka topics to
have fairly homogenous data, and hence should compress evenly. I was
curious if you have data which shows otherwise.

Thanks,
Apurva

On Tue, Feb 21, 2017 at 12:30 PM, Becket Qin  wrote:

> Hi folks,
>
> I would like to start the discussion thread on KIP-126. The KIP propose
> adding a new configuration to KafkaProducer to allow batching based on
> uncompressed message size.
>
> Comments are welcome.
>
> The KIP wiki is following:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> 126+-+Allow+KafkaProducer+to+batch+based+on+uncompressed+size
>
> Thanks,
>
> Jiangjie (Becket) Qin
>


[jira] [Updated] (KAFKA-4144) Allow per stream/table timestamp extractor

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4144?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4144:
---
Description: 
At the moment the timestamp extractor is configured via a {{StreamConfig}} 
value to {{KafkaStreams}}. That means you can only have a single timestamp 
extractor per app, even though you may be joining multiple streams/tables that 
require different timestamp extraction methods.

You should be able to specify a timestamp extractor via 
{{KStreamBuilder.stream()/table()}}, just like you can specify key and value 
serdes that override the StreamConfig defaults.

Specifying a per-stream extractor should only be possible for sources, but not 
for intermediate topics. For PAPI we cannot enforce this, but for DSL 
{{through()}} should not allow to set a custom extractor by the user. In 
contrast, with regard to KAFKA-4785, is must internally set an extractor that 
returns the records metadata timestamp in order to overwrite the global 
extractor from {{StreamsConfig}} (ie, set {{FailOnInvalidTimestampExtractor}})

  was:
At the moment the timestamp extractor is configured via a StreamConfig value to 
KafkaStreams.  That means you can only have a single timestamp extractor per 
app, even though you may be joining multiple streams/tables that require 
different timestamp extraction methods.

You should be able to specify a timestamp extractor via 
KStreamBuilder.stream/table, just like you can specify key and value serdes 
that override the StreamConfig defaults.


> Allow per stream/table timestamp extractor
> --
>
> Key: KAFKA-4144
> URL: https://issues.apache.org/jira/browse/KAFKA-4144
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Elias Levy
>Assignee: Jeyhun Karimov
>  Labels: api, needs-kip
>
> At the moment the timestamp extractor is configured via a {{StreamConfig}} 
> value to {{KafkaStreams}}. That means you can only have a single timestamp 
> extractor per app, even though you may be joining multiple streams/tables 
> that require different timestamp extraction methods.
> You should be able to specify a timestamp extractor via 
> {{KStreamBuilder.stream()/table()}}, just like you can specify key and value 
> serdes that override the StreamConfig defaults.
> Specifying a per-stream extractor should only be possible for sources, but 
> not for intermediate topics. For PAPI we cannot enforce this, but for DSL 
> {{through()}} should not allow to set a custom extractor by the user. In 
> contrast, with regard to KAFKA-4785, is must internally set an extractor that 
> returns the records metadata timestamp in order to overwrite the global 
> extractor from {{StreamsConfig}} (ie, set {{FailOnInvalidTimestampExtractor}})



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-4785) Records from internal repartitioning topics should always use RecordMetadataTimestampExtractor

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4785?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-4785:
---
Description: 
Users can specify what timestamp extractor should be used to decode the 
timestamp of input topic records. As long as RecordMetadataTimestamp or 
WallclockTime is use this is fine. 

However, for custom timestamp extractors it might be invalid to apply this 
custom extractor to records received from internal repartitioning topics. The 
reason is that Streams sets the current "stream time" as record metadata 
timestamp explicitly before writing to intermediate repartitioning topics 
because this timestamp should be use by downstream subtopologies. A custom 
timestamp extractor might return something different breaking this assumption.

Thus, for reading data from intermediate repartitioning topic, the configured 
timestamp extractor should not be used, but the record's metadata timestamp 
should be extracted as record timestamp.

  was:
Users can specify what timestamp extractor should be used to decode the 
timestamp of input topic records. As long as RecordMetadataTimestamp or 
WallclockTime is use this is fine. 

However, for custom timestamp extractors it might be invalid to apply this 
custom extractor to records received from internal repartitioning topics. The 
reason is that Streams sets the current "stream time" as record metadata 
timestamp explicitly before writing to intermediate repartitioning topics 
because this timestamp should be use by downstream subtopologies. A custom 
timestamp extractor might return something different breaking this assumption.


> Records from internal repartitioning topics should always use 
> RecordMetadataTimestampExtractor
> --
>
> Key: KAFKA-4785
> URL: https://issues.apache.org/jira/browse/KAFKA-4785
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Matthias J. Sax
>
> Users can specify what timestamp extractor should be used to decode the 
> timestamp of input topic records. As long as RecordMetadataTimestamp or 
> WallclockTime is use this is fine. 
> However, for custom timestamp extractors it might be invalid to apply this 
> custom extractor to records received from internal repartitioning topics. The 
> reason is that Streams sets the current "stream time" as record metadata 
> timestamp explicitly before writing to intermediate repartitioning topics 
> because this timestamp should be use by downstream subtopologies. A custom 
> timestamp extractor might return something different breaking this assumption.
> Thus, for reading data from intermediate repartitioning topic, the configured 
> timestamp extractor should not be used, but the record's metadata timestamp 
> should be extracted as record timestamp.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-4785) Records from internal repartitioning topics should always use RecordMetadataTimestampExtractor

2017-02-21 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-4785:
--

 Summary: Records from internal repartitioning topics should always 
use RecordMetadataTimestampExtractor
 Key: KAFKA-4785
 URL: https://issues.apache.org/jira/browse/KAFKA-4785
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.2.0
Reporter: Matthias J. Sax


Users can specify what timestamp extractor should be used to decode the 
timestamp of input topic records. As long as RecordMetadataTimestamp or 
WallclockTime is use this is fine. 

However, for custom timestamp extractors it might be invalid to apply this 
custom extractor to records received from internal repartitioning topics. The 
reason is that Streams sets the current "stream time" as record metadata 
timestamp explicitly before writing to intermediate repartitioning topics 
because this timestamp should be use by downstream subtopologies. A custom 
timestamp extractor might return something different breaking this assumption.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-2594) Add a key-value store that is a fixed-capacity in-memory LRU cache

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2594?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-2594:
---
Affects Version/s: 0.10.0.0

> Add a key-value store that is a fixed-capacity in-memory LRU cache 
> ---
>
> Key: KAFKA-2594
> URL: https://issues.apache.org/jira/browse/KAFKA-2594
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
> Fix For: 0.10.0.0
>
>
> The current {{KeyValueStore}} implementations are not limited in size, and 
> thus are less useful for some use cases. This subtask will add a simple 
> key-value store that maintains in memory at most a maximum number of entries 
> that were recently read or written. When the cache size reaches the capacity 
> and a new entry is to be added, the least recently used entry will be 
> automatically purged from the cache. This key-value store will extend 
> {{MeteredKeyValueStore}} for monitoring and recording of changes to a backing 
> topic, enabling recovery of the cache contents from the replicated state.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-2649) Add support for custom partitioner in sink nodes

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2649?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-2649:
---
Affects Version/s: 0.10.0.0

> Add support for custom partitioner in sink nodes
> 
>
> Key: KAFKA-2649
> URL: https://issues.apache.org/jira/browse/KAFKA-2649
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
> Fix For: 0.10.0.0
>
>
> The only way for Processor implementations to control partitioning of 
> forwarded messages is to set the partitioner class as property 
> {{ProducerConfig.PARTITIONER_CLASS_CONFIG}} in the StreamsConfig, which 
> should be set to the name of a 
> {{org.apache.kafka.clients.producer.Partitioner}} implementation. However, 
> doing this requires the partitioner knowing how to properly partition *all* 
> topics, not just the one or few topics used by the Processor.
> Instead, Kafka Streams should make it easy to optionally add a partitioning 
> function for each sink used in a topology. Each sink represents a single 
> output topic, and thus is far simpler to implement. Additionally, the sink is 
> already typed with the key and value types (via serdes for the keys and 
> values), so the partitioner can be also be typed with the key and value 
> types. Finally, this also keeps the logic of choosing partitioning strategies 
> where it belongs, as part of building the topology.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-2593) KeyValueStores should not require use of the context's default serializers and deserializers

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2593?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-2593:
---
Affects Version/s: 0.10.0.0

> KeyValueStores should not require use of the context's default serializers 
> and deserializers
> 
>
> Key: KAFKA-2593
> URL: https://issues.apache.org/jira/browse/KAFKA-2593
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Randall Hauch
>Assignee: Randall Hauch
> Fix For: 0.10.0.0
>
>
> Currently the {{InMemoryKeyValueStore}} is only able to use the key and value 
> serializers and deserializers (aka, "serdes") from the {{ProcessingContext}}. 
> This means that a {{Processor}} implementation that wants to use the 
> {{InMemoryKeyValueStore}} can only do this if the key and value types match 
> those set up as the default serdes in the topology's configuration.
> Additionally, the {{RocksDBKeyValueStore}} is only capable of {{byte[]}} keys 
> and values.
> Both of these key-value stores should allow the component using them to 
> specify the serdes for both the keys and values. As a convenience, the 
> current behavior should still be supported, as should a way to infer the 
> serdes for the "built-in" serializers and deserializers (e.g., strings, 
> integers, longs, and byte arrays).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-2694) Make a task id be a composite id of a topic group id and a partition id

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2694?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-2694:
---
Affects Version/s: 0.10.0.0

> Make a task id be a composite id of a topic group id and a partition id
> ---
>
> Key: KAFKA-2694
> URL: https://issues.apache.org/jira/browse/KAFKA-2694
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yasuhiro Matsuda
>Assignee: Yasuhiro Matsuda
> Fix For: 0.10.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3153) Serializer/Deserializer Registration and Type inference

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3153:
---
Affects Version/s: 0.10.0.0

> Serializer/Deserializer Registration and Type inference
> ---
>
> Key: KAFKA-3153
> URL: https://issues.apache.org/jira/browse/KAFKA-3153
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yasuhiro Matsuda
>Assignee: Yasuhiro Matsuda
> Fix For: 0.10.0.0
>
>
> This changes the way serializer/deserializer are selected by the framework. 
> The new scheme requires the app dev to register serializers/deserializers for 
> types using API. The framework infers the type of data from topology and uses 
> appropriate serializer/deserializer. This is best effort. Type inference is 
> not always possible due to Java's type erasure. If a type cannot be 
> determined, a user code can supply more information.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-2706) Make state stores first class citizens in the processor DAG

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-2706:
---
Affects Version/s: 0.10.0.0

> Make state stores first class citizens in the processor DAG
> ---
>
> Key: KAFKA-2706
> URL: https://issues.apache.org/jira/browse/KAFKA-2706
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yasuhiro Matsuda
>Assignee: Yasuhiro Matsuda
> Fix For: 0.10.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-2811) Add standby tasks

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2811?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-2811:
---
Affects Version/s: 0.10.0.0

> Add standby tasks
> -
>
> Key: KAFKA-2811
> URL: https://issues.apache.org/jira/browse/KAFKA-2811
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yasuhiro Matsuda
>Assignee: Yasuhiro Matsuda
> Fix For: 0.10.0.0
>
>
> Restoring local state from state change-log topics can be expensive. To 
> alleviate this, we want to have an option to keep replications of local 
> states that are kept up to date. The task assignment logic should be aware of 
> existence of such replicas.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-2902) StreamingConfig getConsumerConfiigs uses getRestoreConsumerConfigs instead of getBaseConsumerConfigs

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-2902:
---
Affects Version/s: 0.10.0.0

> StreamingConfig getConsumerConfiigs uses getRestoreConsumerConfigs instead of 
>  getBaseConsumerConfigs
> -
>
> Key: KAFKA-2902
> URL: https://issues.apache.org/jira/browse/KAFKA-2902
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
> Fix For: 0.10.0.0
>
>
> When starting a KafkaStreaming instance the 
> StreamingConfig.getConsumerConfigs method uses the getRestoreConsumerConfigs 
> to retrieve properties. But this method removes the groupId property which 
> causes an error and the KafkaStreaming instance shuts down.  On 
> KafkaStreaming startup StreamingConfig should use getBaseConsumerConfigs 
> instead.
> Exception in thread "StreamThread-1" org.apache.kafka.common.KafkaException: 
> org.apache.kafka.common.errors.ApiException: The configured groupId is invalid
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:309)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:198)
> Caused by: org.apache.kafka.common.errors.ApiException: The configured 
> groupId is invalid 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-2763) Reduce stream task migrations and initialization costs

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-2763:
---
Affects Version/s: 0.10.0.0

> Reduce stream task migrations and initialization costs
> --
>
> Key: KAFKA-2763
> URL: https://issues.apache.org/jira/browse/KAFKA-2763
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yasuhiro Matsuda
>Assignee: Yasuhiro Matsuda
> Fix For: 0.10.0.0
>
>
> Stream task assignment is not aware of either the previous task assignment or 
> local states of participating clients. By making the assignment logic aware 
> of them, we can reduce task migrations and initialization cost.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-2984) KTable should send old values along with new values to downstreams

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-2984:
---
Affects Version/s: 0.10.0.0

> KTable should send old values along with new values to downstreams
> --
>
> Key: KAFKA-2984
> URL: https://issues.apache.org/jira/browse/KAFKA-2984
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yasuhiro Matsuda
>Assignee: Yasuhiro Matsuda
> Fix For: 0.10.0.0
>
>
> Old values are necessary for implementing aggregate functions. KTable should 
> augment an event with its old value. Basically KTable stream is a stream of 
> (key, (new value, old value)) internally. The old value may be omitted when 
> it is not used in the topology.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-2727) initialize only the part of the topology relevant to the task

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-2727:
---
Affects Version/s: 0.10.0.0

> initialize only the part of the topology relevant to the task
> -
>
> Key: KAFKA-2727
> URL: https://issues.apache.org/jira/browse/KAFKA-2727
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yasuhiro Matsuda
>Assignee: Yasuhiro Matsuda
> Fix For: 0.10.0.0
>
>
> Currently each streaming task initializes the entire topology regardless of 
> the assigned topic-partitions. This is wasteful especially when the topology 
> has local state stores. All local state stores are restored from their change 
> log topics even when are not actually used in the task execution. To fix 
> this, the task initialization should be aware of the relevant subgraph of the 
> topology and initializes only processors and state stores in the subgraph.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3060) Refactor MeteredXXStore

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3060:
---
Affects Version/s: 0.10.0.0

> Refactor MeteredXXStore
> ---
>
> Key: KAFKA-3060
> URL: https://issues.apache.org/jira/browse/KAFKA-3060
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yasuhiro Matsuda
>Assignee: Guozhang Wang
>Priority: Minor
> Fix For: 0.10.0.0
>
>
> ** copied from a github comment by Guozhang Wang **
> The original motivation of having the MeteredXXStore is to wrap all metrics / 
> logging semantics into one place so they do not need to be re-implemented 
> again, but this seems to be an obstacle with the current pattern now, for 
> example MeteredWindowStore.putAndReturnInternalKey is only used for logging, 
> and MeteredWindowStore.putInternal / MeteredWindowStore.getInternal are never 
> used since only its inner will trigger this function. So how about 
> refactoring this piece as follows:
> 1. WindowStore only expose two APIs: put(K, V) and get(K, long).
> 2. Add a RollingRocksDBStores that does not extend any interface, but only 
> implements putInternal, getInternal and putAndReturnInternalKey that uses 
> underlying RocksDBStore as Segments.
> 3. RocksDBWindowStore implements WindowStore with an RollingRocksDBStores 
> inner.
> 4. Let MeteredXXStore only maintain the metrics recording logic, and let 
> different stores implement their own logging logic, since this is now 
> different across different types and are better handled separately. Also some 
> types of stores may not even have a loggingEnabled flag, if it will always 
> log, or will never log.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3016) Add KStream-KStream window joins

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3016:
---
Affects Version/s: 0.10.0.0

> Add KStream-KStream window joins
> 
>
> Key: KAFKA-3016
> URL: https://issues.apache.org/jira/browse/KAFKA-3016
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yasuhiro Matsuda
>Assignee: Yasuhiro Matsuda
> Fix For: 0.10.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-2872) Error starting KafkaStream caused by sink not being connected to parent source/processor nodes

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-2872:
---
Affects Version/s: 0.10.0.0

> Error starting KafkaStream caused by sink not being connected to parent 
> source/processor nodes
> --
>
> Key: KAFKA-2872
> URL: https://issues.apache.org/jira/browse/KAFKA-2872
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
> Fix For: 0.10.0.0
>
>
> When starting the KafkaStream I get the following Exception:
> Exception in thread "main" java.util.NoSuchElementException: id: SINK
>   at 
> org.apache.kafka.streams.processor.internals.QuickUnion.root(QuickUnion.java:40)
>   at 
> org.apache.kafka.streams.processor.TopologyBuilder.makeNodeGroups(TopologyBuilder.java:387)
>   at 
> org.apache.kafka.streams.processor.TopologyBuilder.topicGroups(TopologyBuilder.java:339)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.(StreamThread.java:139)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.(StreamThread.java:120)
>   at 
> org.apache.kafka.streams.KafkaStreaming.(KafkaStreaming.java:110)
>   at bbejeck.ProcessorDriver.main(ProcessorDriver.java:35)
> The TopologyBuilder is being built like so:
> topologyBuilder.addSource("SOURCE", new StringDeserializer(), new 
> StringDeserializer(), "src-topic")
> .addProcessor("PROCESS", new 
> GenericProcessorClient(replaceVowels), "SOURCE")
> .addSink("SINK", "dest-topic", new StringSerializer(), new 
> StringSerializer(), "PROCESS");
> Looks to me the cause of the error is that in  TopologyBuilder.addSink method 
> the sink  is never connected with it's parent.  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-2856) add KTable

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-2856:
---
Affects Version/s: 0.10.0.0

> add KTable
> --
>
> Key: KAFKA-2856
> URL: https://issues.apache.org/jira/browse/KAFKA-2856
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yasuhiro Matsuda
>Assignee: Yasuhiro Matsuda
> Fix For: 0.10.0.0
>
>
> KTable is a special type of the stream that represents a changelog of a 
> database table (or a key-value store).
> A changelog has to meet the following requirements.
> * Key-value mapping is surjective in the database table (the key must be the 
> primary key).
> * All insert/update/delete events are delivered in order for the same key
> * An update event has the whole data (not just delta).
> * A delete event is represented by the null value.
> KTable does not necessarily materialized as a local store. It may be 
> materialized when necessary. (see below)
> KTable supports look-up by key. KTable is materialized implicitly when 
> look-up is necessary.
> * KTable may be created from a topic. (Base KTable)
> * KTable may be created from another KTable by filter(), filterOut(), 
> mapValues(). (Derived KTable)
> * A call to the user supplied function is skipped when the value is null 
> since such an event represents a deletion. 
> * Instead of dropping, events filtered out by filter() or filterOut() are 
> converted to delete events. (Can we avoid this?)
> * map(), flatMap() and flatMapValues() are not supported since they may 
> violate the changelog requirements
> A derived KTable may be persisted to a topic by to() or through(). through() 
> creates another base KTable. 
> KTable can be converted to KStream by the toStream() method.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3278) clientId is not unique in producer/consumer registration leads to mbean warning

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3278:
---
Affects Version/s: 0.10.0.0

> clientId is not unique in producer/consumer registration leads to mbean 
> warning
> ---
>
> Key: KAFKA-3278
> URL: https://issues.apache.org/jira/browse/KAFKA-3278
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
> Environment: Mac OS
>Reporter: Tom Dearman
>Assignee: Tom Dearman
>Priority: Minor
> Fix For: 0.10.0.0
>
>
> The clientId passed through to StreamThread is not unique and this is used to 
> create consumers and producers, which in turn try to register mbeans, this 
> leads to a warn that mbean already registered.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3195) Transient test failure in OffsetCheckpointTest.testReadWrite

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3195:
---
Affects Version/s: 0.10.0.0

> Transient test failure in OffsetCheckpointTest.testReadWrite
> 
>
> Key: KAFKA-3195
> URL: https://issues.apache.org/jira/browse/KAFKA-3195
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Ewen Cheslack-Postava
>Assignee: Ismael Juma
> Fix For: 0.10.0.0
>
>
> It looks like its probably an issue with parallel tests trying to access the 
> same fixed path, where one test deletes the file. Saw this on 
> 86a9036a7b03c8ae07d014c25a5eedc315544139.
> {quote}
> org.apache.kafka.streams.state.internals.OffsetCheckpointTest > testReadWrite 
> FAILED
> java.io.FileNotFoundException: 
> /tmp/kafka-streams/offset_checkpoint.test.tmp (No such file or directory)
> at java.io.FileOutputStream.open(Native Method)
> at java.io.FileOutputStream.(FileOutputStream.java:221)
> at java.io.FileOutputStream.(FileOutputStream.java:171)
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:68)
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpointTest.testReadWrite(OffsetCheckpointTest.java:48)
> {quote}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3337) Extract selector as a separate groupBy operator for KTable aggregations

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3337?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3337:
---
Affects Version/s: 0.10.0.0

> Extract selector as a separate groupBy operator for KTable aggregations
> ---
>
> Key: KAFKA-3337
> URL: https://issues.apache.org/jira/browse/KAFKA-3337
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Guozhang Wang
>Assignee: Matthias J. Sax
>  Labels: api, newbie++
> Fix For: 0.10.0.0
>
>
> Currently KTable aggregation takes a selector used for selecting the 
> aggregate key.and an aggregator for aggregating the values with the same 
> selected key, which makes the function a little bit "heavy":
> {code}
> table.groupBy(initializer, adder, substractor, selector, /* optional serde*/);
> {code}
>  It is better to extract the selector in a separate groupBy function such that
> {code}
> KTableGrouped KTable#groupBy(selector);
> KTable KTableGrouped#aggregate(initializer, adder, substractor, /* optional 
> serde*/);
> {code}
> Note that "KTableGrouped" only have APIs for aggregate and reduce, and none 
> else. So users have to follow the pattern below:
> {code}
> table.groupBy(...).aggregate(...);
> {code}
> This pattern is more natural for users who are familiar with SQL / Pig or 
> Spark DSL, etc.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3338) Add print and writeAsText functions to the Streams DSL

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3338:
---
Affects Version/s: 0.10.0.0

> Add print and writeAsText functions to the Streams DSL
> --
>
> Key: KAFKA-3338
> URL: https://issues.apache.org/jira/browse/KAFKA-3338
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Guozhang Wang
>Assignee: Bill Bejeck
>  Labels: api, newbie++
> Fix For: 0.10.0.0
>
>
> We want to provide some REPL-like pattern for users for better debuggability. 
> More concretely, we want to allow users to easily inspect their intermediate 
> data streams in the topology while running the application. Theoretically 
> this can be done by using a break point, or by calling System.out.println() 
> inside the operator, or through a finer grained trace-level logging. But more 
> user-friendly API would be to add a print() function to the KStream / KTable 
> object like:
> {code}
> // Prints the elements in this stream to the stdout, i.e. "System.out" of the 
> JVM
> KStream#print(/* optional serde */);  
> KTable#print(/* optional serde */);  
> // Writes the stream as text file(s) to the specified location.
> KStream#writeAsText(String filePath, /* optional serde */);
> KTable#writeAsText(String filePath, /* optional serde */);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3430) Allow users to set key in KTable.toStream() and KStream

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3430:
---
Affects Version/s: 0.10.0.0

> Allow users to set key in KTable.toStream() and KStream
> ---
>
> Key: KAFKA-3430
> URL: https://issues.apache.org/jira/browse/KAFKA-3430
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Guozhang Wang
>Assignee: Bill Bejeck
>  Labels: api
> Fix For: 0.10.0.0
>
>
> Currently KTable.toStream does not take any parameters and hence users who 
> wants to set the key need to do two steps:
> {code}table.toStream().map(...){code} in order to do so. We can make it in 
> one step by providing the mapper parameter in toStream.
> And similarly today users usually need to call {code} KStream.map() {code} in 
> order to select the key before aggregation-by-key operation if the original 
> stream is does not contain keys. 
> We can consider adding a specific function in KStream to do so:
> {code}KStream.selectKey(mapper){code}
> which essential is the same as
> {code}KStream.map(/* mapper that does not change the value, but only the key 
> */){code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3439) Document possible exception thrown in public APIs

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3439?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3439:
---
Affects Version/s: 0.10.0.0

> Document possible exception thrown in public APIs
> -
>
> Key: KAFKA-3439
> URL: https://issues.apache.org/jira/browse/KAFKA-3439
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: api, docs
> Fix For: 0.10.0.0
>
>
> Candidate interfaces include all the ones in "kstream", "processor" and 
> "state" packages.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3519) Refactor Transformer templates to return the same strong-typed value.

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3519?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3519:
---
Affects Version/s: 0.10.0.0

> Refactor Transformer templates to return the same strong-typed value.
> -
>
> Key: KAFKA-3519
> URL: https://issues.apache.org/jira/browse/KAFKA-3519
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>  Labels: api
> Fix For: 0.10.0.0
>
>
> Currently the Transformer interface's template is:
> {code}
> R transform(K key, V value);
> {code}
> While all its usage requires a key-value pair, and hence constructing the 
> object as:
> {code}
> new Transformer>
> {code}
> Since we expect Transformer to be only used in the Streams DSL, which is 
> supposed to return another key-value pair streams, it's better to define its 
> template just as
> {code}
> public interface Transformer {
> KeyValue transform(K key, V value);
> }
> {code}
> Also, we can allow punctuate() to also return a nullable KeyValue 
> pair, and let the implementation to forward to downstream processor only when 
> the returned pair is not null.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3477) Add customizable StreamPartition into #to functions of Streams DSL

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3477?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3477:
---
Affects Version/s: 0.10.0.0

> Add customizable StreamPartition into #to functions of Streams DSL
> --
>
> Key: KAFKA-3477
> URL: https://issues.apache.org/jira/browse/KAFKA-3477
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Guozhang Wang
>Assignee: Matthias J. Sax
>  Labels: newbie
> Fix For: 0.10.0.0
>
>
> In the lower-level Processor API we allow users to pass in a customizable 
> StreamPartitioner when creating a new sink processor node to the topology:
> {code}
> builder.addSink(String name, String topic, StreamPartitioner partitioner, 
> String... parentNames));
> {code}
> This StreamPartitioner allows users to specify any partitioning schemes based 
> on record values instead of using the default behavior of hashing on the 
> message key; but it is not exposed in the higher-level Streams DSL.
> We can add this parameter to the Streams DSL as well:
> {code}
> KStream#to(String topic, StreamPartitioner partitioner);
> KTable#to(String topic, StreamPartitioner partitioner);
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3440) Add Javadoc for KTable (changelog stream) and KStream (record stream)

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3440:
---
Affects Version/s: 0.10.0.0

> Add Javadoc for KTable (changelog stream) and KStream (record stream)
> -
>
> Key: KAFKA-3440
> URL: https://issues.apache.org/jira/browse/KAFKA-3440
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Guozhang Wang
>Assignee: Matthias J. Sax
>  Labels: docs
> Fix For: 0.10.0.0
>
>
> Currently we only have a 1-liner in {code}KTable{code} and 
> {code}KStream{code} class describing the changelog and record streams. We'd 
> better have a more detailed explanation as in the web docs in Javadocs as 
> well.
> Also we want to have some more description in windowed {code}KTable{code}.
> As a side tasks: I am many classes, method JavaDoc lack the {{@return}} tag 
> which should be used always.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3505) Set curRecord in punctuate() functions

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3505:
---
Affects Version/s: 0.10.0.0

> Set curRecord in punctuate() functions
> --
>
> Key: KAFKA-3505
> URL: https://issues.apache.org/jira/browse/KAFKA-3505
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>  Labels: user-experience
> Fix For: 0.10.0.0
>
>
> Punctuate() function in processor and transformer needs to be handled a bit 
> differently from process(), since it can generate new records to pass through 
> the topology from anywhere of the topology, whereas for the latter case a 
> record is always polled from Kafka and passed via the source processors.
> Today because we do not set the curRecord correctly, calls to timestamp() / 
> topic() / etc would actually trigger a KafkaStreamsException.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3499) byte[] should not be used as Map key nor Set member

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3499:
---
Affects Version/s: 0.10.0.0

> byte[] should not be used as Map key nor Set member
> ---
>
> Key: KAFKA-3499
> URL: https://issues.apache.org/jira/browse/KAFKA-3499
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: josh gruenberg
>Assignee: Guozhang Wang
>  Labels: user-experience
> Fix For: 0.10.0.0
>
>
> On the JVM, Array.equals and Array.hashCode do not incorporate array 
> contents; they inherit Object.equals/hashCode. This implies that Collections 
> that rely upon equals/hashCode (eg, HashMap/HashSet and variants) treat two 
> arrays with equal contents as distinct elements.
> Many of the Kafka Streams internal classes currently use generic HashMaps and 
> Sets to manage caches and invalidation status. For example, 
> RocksDBStore.cacheDirtyKeys is a HashSet. Then, in RocksDBWindowStore, the 
> Elements are constructed as RocksDBStore.
> Similarly, the MemoryLRUCache internally holds a 
> LinkedHashMap map, and a HashSet keys, and these end up holding 
> byte[] keys. Finally, user-code may attempt to use any of these provided 
> types with byte[], with undesirable results.
> Keys that are byte-arrays should be wrapped in a type that incorporates the 
> content in their computation of equals/hashCode. java.nio.ByteBuffer is one 
> such type that could be used, but a purpose-built immutable class would 
> likely be a better solution.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3504) Changelog partition configured to enable log compaction

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3504?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3504:
---
Affects Version/s: 0.10.0.0

> Changelog partition configured to enable log compaction
> ---
>
> Key: KAFKA-3504
> URL: https://issues.apache.org/jira/browse/KAFKA-3504
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: user-experience
> Fix For: 0.10.0.0
>
>
> Today Kafka Streams automatically configured changelog topics for state 
> stores, however these changelog topics are not configured as log compaction 
> enabled. We should set the right configs when auto-creating these internal 
> topics.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3512) Add a foreach() operator in Kafka Streams DSL

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3512:
---
Affects Version/s: 0.10.0.0

> Add a foreach() operator in Kafka Streams DSL
> -
>
> Key: KAFKA-3512
> URL: https://issues.apache.org/jira/browse/KAFKA-3512
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: api
> Fix For: 0.10.0.0
>
>
> This would be a more intuitive operator to replace the mis-usage of map():
> {code}
> map((k, v) -> process(k, v))
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3521) Better handling NPEs in Streams DSL implementation

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3521?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3521:
---
Affects Version/s: 0.10.0.0

> Better handling NPEs in Streams DSL implementation
> --
>
> Key: KAFKA-3521
> URL: https://issues.apache.org/jira/browse/KAFKA-3521
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>  Labels: user-experience
> Fix For: 0.10.0.0
>
>
> We observer a few cases where a mal-programmed application would trigger NPE 
> thrown from some lower-level classes, where they should really been validated 
> with more meaningful exceptions being thrown if the validation fails.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3598) Improve JavaDoc of public API

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3598?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3598:
---
Affects Version/s: 0.10.0.0

> Improve JavaDoc of public API
> -
>
> Key: KAFKA-3598
> URL: https://issues.apache.org/jira/browse/KAFKA-3598
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Trivial
>  Labels: docs
> Fix For: 0.10.0.0
>
>
> Add missing JavaDoc to all {{public}} methods of public API. Is related to 
> KAFKA-3440 and KAFKA-3574.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3599) Move WindowStoreUtils to package "internals"

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3599?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3599:
---
Affects Version/s: 0.10.0.0

> Move WindowStoreUtils to package "internals"
> 
>
> Key: KAFKA-3599
> URL: https://issues.apache.org/jira/browse/KAFKA-3599
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Minor
>  Labels: api
> Fix For: 0.10.0.0
>
>
> - remove {{WindowStoreUtils}} from public API, ie, move to sub-package 
> {{internals}}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3658) Incorrect validation check on maintenance period with join window size

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3658:
---
Affects Version/s: 0.10.0.0

> Incorrect validation check on maintenance period with join window size
> --
>
> Key: KAFKA-3658
> URL: https://issues.apache.org/jira/browse/KAFKA-3658
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>  Labels: architecture
> Fix For: 0.10.0.0
>
>
> As [~h...@pinterest.com] found out, the current validation check of 
> {{KStreamJoinWindow}} requires the retention period to be at least twice than 
> the join window size. This check was originally for making the segment 
> interval to be larger than the join window size. But for windowed 
> stream-stream join this is not necessary.
> More specifically, for example with a window size 6, and retention period 12, 
> and num. segment 5, the segment size will be set to 3. This means after time 
> 12, the first segment of [0, 3) will be dropped, then at time 13, a late 
> record with timestamp (1) will not be accepted to the window store, and will 
> not participate in the joining as well.
> The proposed change is to only require retention period to be > window size, 
> not window size * 2.
> cc [~ymatsuda]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3639) Configure default serdes passed via StreamsConfig

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3639?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3639:
---
Affects Version/s: 0.10.0.0

> Configure default serdes passed via StreamsConfig
> -
>
> Key: KAFKA-3639
> URL: https://issues.apache.org/jira/browse/KAFKA-3639
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>  Labels: api
> Fix For: 0.10.0.0
>
>
> For default serde classes passed via configs, their {{configure()}} function 
> are not triggered before using. This makes the default serde not usable, for 
> example, AvroSerializer where users may need to pass in a schema register 
> client. We need to provide the interface where users can pass in the 
> key-value map configs for the default serde classes.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3607) Close state stores explicitly in unit tests upon completing

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3607:
---
Affects Version/s: 0.10.0.0

> Close state stores explicitly in unit tests upon completing
> ---
>
> Key: KAFKA-3607
> URL: https://issues.apache.org/jira/browse/KAFKA-3607
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>  Labels: test
> Fix For: 0.10.0.0
>
>
> In places like {{KStreamTestDriver}}, one or more state stores can be created 
> and initialized, but they are not explicitly closed at the end of the test. 
> We suspect that it may be the cause of the underlying RocksDB "pure virtual 
> method called" exception.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3523) Capture org.apache.kafka.clients.consumer.CommitFailedException in UncaughtExceptionHandler

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3523:
---
Affects Version/s: 0.10.0.0

> Capture org.apache.kafka.clients.consumer.CommitFailedException in 
> UncaughtExceptionHandler
> ---
>
> Key: KAFKA-3523
> URL: https://issues.apache.org/jira/browse/KAFKA-3523
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Guozhang Wang
>Assignee: Eno Thereska
>  Labels: newbie, user-experience
> Fix For: 0.10.0.0
>
>
> When the sync commit failed due to an ongoing rebalance, it is thrown all the 
> way up to the main thread and cause the whole Kafka Streams application to 
> stop, even if users set UncaughtExceptionHandler. We need to be able to catch 
> this exception in that handler as well.
> Example stack trace (with UncaughtExceptionHandler set, but not been able to 
> capture this exception):
> {code}
> [2016-04-06 17:49:33,891] WARN Failed to commit StreamTask #0_0 in thread 
> [StreamThread-1]:  
> (org.apache.kafka.streams.processor.internals.StreamThread:485)
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
> completed since the group has already rebalanced and assigned the partitions 
> to another member. This means that the time between subsequent calls to 
> poll() was longer than the configured session.timeout.ms, which typically 
> implies that the poll loop is spending too much time message processing. You 
> can address this either by increasing the session timeout or by reducing the 
> maximum size of batches returned in poll() with max.poll.records.
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:567)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:508)
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:659)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3629) KStreamImpl.to(...) throws NPE when the value SerDe is null

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3629:
---
Affects Version/s: 0.10.0.0

> KStreamImpl.to(...) throws NPE when the value SerDe is null
> ---
>
> Key: KAFKA-3629
> URL: https://issues.apache.org/jira/browse/KAFKA-3629
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Damian Guy
>Assignee: Guozhang Wang
> Fix For: 0.10.0.0
>
>
> On line 301 of KStreamImpl the null check is against the keySerde rather than 
> the valSerde. This results in an NPE if the valSerde is null 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3862) Kafka streams documentation: partition<->thread<->task assignment unclear

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3862:
---
Affects Version/s: 0.10.0.0

> Kafka streams documentation: partition<->thread<->task assignment unclear
> -
>
> Key: KAFKA-3862
> URL: https://issues.apache.org/jira/browse/KAFKA-3862
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
> Environment: N/A
>Reporter: Dmitry Minkovsky
>Assignee: Guozhang Wang
>Priority: Minor
> Fix For: 0.10.0.0
>
>
> Hi. Not sure if this is the right place to post about documentation (this 
> isn't actually a software bug).
> I believe the following span of documentation is self-contradictory:
> !http://i.imgur.com/WM8ada2.png!
> In the paragraph text before and after the image, it says the partitions are 
> *evenly distributed* among the threads. However, in the image, Thread 1 gets 
> p1 and p3 (4 partitions, total) while Thread 2 gets p2 (2 partitions, total). 
> That's "even" in the sense that you can't do any better, but still not even, 
> and in conflict with the text that follows the image:
> !http://i.imgur.com/xq3SOre.png!
> This doesn't make sense because why would p2 from topicA and topicB go to 
> different threads? if this were the cause, you couldn't join the partitions, 
> right?
> Either way, it seems like the two things are in conflict.
> Finally, the wording:
> bq. ...assume that each thread executes a single stream task...
> What does "assume" mean here? I've not seen any indication elsewhere in the 
> documentation that shows you can configure this behavior. Sorry if I missed 
> this.
> Thank you!



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3619) State lock file handle leaks

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3619:
---
Affects Version/s: 0.10.0.0

> State lock file handle leaks
> 
>
> Key: KAFKA-3619
> URL: https://issues.apache.org/jira/browse/KAFKA-3619
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Greg Fodor
>Assignee: Greg Fodor
>Priority: Critical
> Fix For: 0.10.0.0
>
>
> The .lock files in the state store directories do not seem to be having their 
> file handles freed (despite the locks being freed), so on a complex job the 
> number of file handles in use goes up rapidly as the locks are taken for the 
> cleanup routine at the end of the thread run loop.  Running lsof shows the 
> number of open filehandles on the .lock file increasing rapidly over time. In 
> a separate test project, I reproduced the issue and determined that in order 
> for the filehandle to be relinquished the FileChannel instance must be 
> properly closed. 
> PR:
> https://github.com/apache/kafka/pull/1267



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3612) Add initial set of integration tests

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3612:
---
Affects Version/s: (was: 0.10.0.0)

> Add initial set of integration tests
> 
>
> Key: KAFKA-3612
> URL: https://issues.apache.org/jira/browse/KAFKA-3612
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Eno Thereska
>Assignee: Eno Thereska
>  Labels: test
> Fix For: 0.10.0.0
>
>
> We need to add a suite of integration tests for streams. This will include 
> wrapper classes for embedded zookeeper and Kafka.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3612) Add initial set of integration tests

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3612:
---
Affects Version/s: 0.10.0.0

> Add initial set of integration tests
> 
>
> Key: KAFKA-3612
> URL: https://issues.apache.org/jira/browse/KAFKA-3612
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Eno Thereska
>Assignee: Eno Thereska
>  Labels: test
> Fix For: 0.10.0.0
>
>
> We need to add a suite of integration tests for streams. This will include 
> wrapper classes for embedded zookeeper and Kafka.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3862) Kafka streams documentation: partition<->thread<->task assignment unclear

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3862?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3862:
---
Affects Version/s: (was: 0.10.0.0)

> Kafka streams documentation: partition<->thread<->task assignment unclear
> -
>
> Key: KAFKA-3862
> URL: https://issues.apache.org/jira/browse/KAFKA-3862
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
> Environment: N/A
>Reporter: Dmitry Minkovsky
>Assignee: Guozhang Wang
>Priority: Minor
> Fix For: 0.10.0.0
>
>
> Hi. Not sure if this is the right place to post about documentation (this 
> isn't actually a software bug).
> I believe the following span of documentation is self-contradictory:
> !http://i.imgur.com/WM8ada2.png!
> In the paragraph text before and after the image, it says the partitions are 
> *evenly distributed* among the threads. However, in the image, Thread 1 gets 
> p1 and p3 (4 partitions, total) while Thread 2 gets p2 (2 partitions, total). 
> That's "even" in the sense that you can't do any better, but still not even, 
> and in conflict with the text that follows the image:
> !http://i.imgur.com/xq3SOre.png!
> This doesn't make sense because why would p2 from topicA and topicB go to 
> different threads? if this were the cause, you couldn't join the partitions, 
> right?
> Either way, it seems like the two things are in conflict.
> Finally, the wording:
> bq. ...assume that each thread executes a single stream task...
> What does "assume" mean here? I've not seen any indication elsewhere in the 
> documentation that shows you can configure this behavior. Sorry if I missed 
> this.
> Thank you!



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3629) KStreamImpl.to(...) throws NPE when the value SerDe is null

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3629:
---
Affects Version/s: (was: 0.10.0.0)

> KStreamImpl.to(...) throws NPE when the value SerDe is null
> ---
>
> Key: KAFKA-3629
> URL: https://issues.apache.org/jira/browse/KAFKA-3629
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Damian Guy
>Assignee: Guozhang Wang
> Fix For: 0.10.0.0
>
>
> On line 301 of KStreamImpl the null check is against the keySerde rather than 
> the valSerde. This results in an NPE if the valSerde is null 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3619) State lock file handle leaks

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3619:
---
Affects Version/s: (was: 0.10.0.0)

> State lock file handle leaks
> 
>
> Key: KAFKA-3619
> URL: https://issues.apache.org/jira/browse/KAFKA-3619
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Greg Fodor
>Assignee: Greg Fodor
>Priority: Critical
> Fix For: 0.10.0.0
>
>
> The .lock files in the state store directories do not seem to be having their 
> file handles freed (despite the locks being freed), so on a complex job the 
> number of file handles in use goes up rapidly as the locks are taken for the 
> cleanup routine at the end of the thread run loop.  Running lsof shows the 
> number of open filehandles on the .lock file increasing rapidly over time. In 
> a separate test project, I reproduced the issue and determined that in order 
> for the filehandle to be relinquished the FileChannel instance must be 
> properly closed. 
> PR:
> https://github.com/apache/kafka/pull/1267



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3016) Add KStream-KStream window joins

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3016:
---
Affects Version/s: (was: 0.9.0.1)
   0.10.0.0

> Add KStream-KStream window joins
> 
>
> Key: KAFKA-3016
> URL: https://issues.apache.org/jira/browse/KAFKA-3016
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Yasuhiro Matsuda
>Assignee: Yasuhiro Matsuda
> Fix For: 0.10.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-2872) Error starting KafkaStream caused by sink not being connected to parent source/processor nodes

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-2872:
---
Affects Version/s: (was: 0.10.0.0)

> Error starting KafkaStream caused by sink not being connected to parent 
> source/processor nodes
> --
>
> Key: KAFKA-2872
> URL: https://issues.apache.org/jira/browse/KAFKA-2872
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
> Fix For: 0.10.0.0
>
>
> When starting the KafkaStream I get the following Exception:
> Exception in thread "main" java.util.NoSuchElementException: id: SINK
>   at 
> org.apache.kafka.streams.processor.internals.QuickUnion.root(QuickUnion.java:40)
>   at 
> org.apache.kafka.streams.processor.TopologyBuilder.makeNodeGroups(TopologyBuilder.java:387)
>   at 
> org.apache.kafka.streams.processor.TopologyBuilder.topicGroups(TopologyBuilder.java:339)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.(StreamThread.java:139)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.(StreamThread.java:120)
>   at 
> org.apache.kafka.streams.KafkaStreaming.(KafkaStreaming.java:110)
>   at bbejeck.ProcessorDriver.main(ProcessorDriver.java:35)
> The TopologyBuilder is being built like so:
> topologyBuilder.addSource("SOURCE", new StringDeserializer(), new 
> StringDeserializer(), "src-topic")
> .addProcessor("PROCESS", new 
> GenericProcessorClient(replaceVowels), "SOURCE")
> .addSink("SINK", "dest-topic", new StringSerializer(), new 
> StringSerializer(), "PROCESS");
> Looks to me the cause of the error is that in  TopologyBuilder.addSink method 
> the sink  is never connected with it's parent.  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-2727) initialize only the part of the topology relevant to the task

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-2727:
---
Affects Version/s: (was: 0.9.0.0)

> initialize only the part of the topology relevant to the task
> -
>
> Key: KAFKA-2727
> URL: https://issues.apache.org/jira/browse/KAFKA-2727
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Yasuhiro Matsuda
>Assignee: Yasuhiro Matsuda
> Fix For: 0.10.0.0
>
>
> Currently each streaming task initializes the entire topology regardless of 
> the assigned topic-partitions. This is wasteful especially when the topology 
> has local state stores. All local state stores are restored from their change 
> log topics even when are not actually used in the task execution. To fix 
> this, the task initialization should be aware of the relevant subgraph of the 
> topology and initializes only processors and state stores in the subgraph.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-2872) Error starting KafkaStream caused by sink not being connected to parent source/processor nodes

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-2872:
---
Affects Version/s: 0.10.0.0

> Error starting KafkaStream caused by sink not being connected to parent 
> source/processor nodes
> --
>
> Key: KAFKA-2872
> URL: https://issues.apache.org/jira/browse/KAFKA-2872
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
> Fix For: 0.10.0.0
>
>
> When starting the KafkaStream I get the following Exception:
> Exception in thread "main" java.util.NoSuchElementException: id: SINK
>   at 
> org.apache.kafka.streams.processor.internals.QuickUnion.root(QuickUnion.java:40)
>   at 
> org.apache.kafka.streams.processor.TopologyBuilder.makeNodeGroups(TopologyBuilder.java:387)
>   at 
> org.apache.kafka.streams.processor.TopologyBuilder.topicGroups(TopologyBuilder.java:339)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.(StreamThread.java:139)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.(StreamThread.java:120)
>   at 
> org.apache.kafka.streams.KafkaStreaming.(KafkaStreaming.java:110)
>   at bbejeck.ProcessorDriver.main(ProcessorDriver.java:35)
> The TopologyBuilder is being built like so:
> topologyBuilder.addSource("SOURCE", new StringDeserializer(), new 
> StringDeserializer(), "src-topic")
> .addProcessor("PROCESS", new 
> GenericProcessorClient(replaceVowels), "SOURCE")
> .addSink("SINK", "dest-topic", new StringSerializer(), new 
> StringSerializer(), "PROCESS");
> Looks to me the cause of the error is that in  TopologyBuilder.addSink method 
> the sink  is never connected with it's parent.  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-2763) Reduce stream task migrations and initialization costs

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-2763:
---
Affects Version/s: (was: 0.9.0.0)

> Reduce stream task migrations and initialization costs
> --
>
> Key: KAFKA-2763
> URL: https://issues.apache.org/jira/browse/KAFKA-2763
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Yasuhiro Matsuda
>Assignee: Yasuhiro Matsuda
> Fix For: 0.10.0.0
>
>
> Stream task assignment is not aware of either the previous task assignment or 
> local states of participating clients. By making the assignment logic aware 
> of them, we can reduce task migrations and initialization cost.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3016) Add KStream-KStream window joins

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3016?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3016:
---
Affects Version/s: (was: 0.10.0.0)

> Add KStream-KStream window joins
> 
>
> Key: KAFKA-3016
> URL: https://issues.apache.org/jira/browse/KAFKA-3016
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Yasuhiro Matsuda
>Assignee: Yasuhiro Matsuda
> Fix For: 0.10.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-2706) Make state stores first class citizens in the processor DAG

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-2706:
---
Affects Version/s: (was: 0.9.0.0)

> Make state stores first class citizens in the processor DAG
> ---
>
> Key: KAFKA-2706
> URL: https://issues.apache.org/jira/browse/KAFKA-2706
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Yasuhiro Matsuda
>Assignee: Yasuhiro Matsuda
> Fix For: 0.10.0.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-2984) KTable should send old values along with new values to downstreams

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-2984:
---
Affects Version/s: (was: 0.9.0.1)
   0.10.0.0

> KTable should send old values along with new values to downstreams
> --
>
> Key: KAFKA-2984
> URL: https://issues.apache.org/jira/browse/KAFKA-2984
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Yasuhiro Matsuda
>Assignee: Yasuhiro Matsuda
> Fix For: 0.10.0.0
>
>
> Old values are necessary for implementing aggregate functions. KTable should 
> augment an event with its old value. Basically KTable stream is a stream of 
> (key, (new value, old value)) internally. The old value may be omitted when 
> it is not used in the topology.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-2872) Error starting KafkaStream caused by sink not being connected to parent source/processor nodes

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-2872:
---
Affects Version/s: (was: 0.9.0.0)

> Error starting KafkaStream caused by sink not being connected to parent 
> source/processor nodes
> --
>
> Key: KAFKA-2872
> URL: https://issues.apache.org/jira/browse/KAFKA-2872
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
> Fix For: 0.10.0.0
>
>
> When starting the KafkaStream I get the following Exception:
> Exception in thread "main" java.util.NoSuchElementException: id: SINK
>   at 
> org.apache.kafka.streams.processor.internals.QuickUnion.root(QuickUnion.java:40)
>   at 
> org.apache.kafka.streams.processor.TopologyBuilder.makeNodeGroups(TopologyBuilder.java:387)
>   at 
> org.apache.kafka.streams.processor.TopologyBuilder.topicGroups(TopologyBuilder.java:339)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.(StreamThread.java:139)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.(StreamThread.java:120)
>   at 
> org.apache.kafka.streams.KafkaStreaming.(KafkaStreaming.java:110)
>   at bbejeck.ProcessorDriver.main(ProcessorDriver.java:35)
> The TopologyBuilder is being built like so:
> topologyBuilder.addSource("SOURCE", new StringDeserializer(), new 
> StringDeserializer(), "src-topic")
> .addProcessor("PROCESS", new 
> GenericProcessorClient(replaceVowels), "SOURCE")
> .addSink("SINK", "dest-topic", new StringSerializer(), new 
> StringSerializer(), "PROCESS");
> Looks to me the cause of the error is that in  TopologyBuilder.addSink method 
> the sink  is never connected with it's parent.  



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3278) clientId is not unique in producer/consumer registration leads to mbean warning

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3278:
---
Affects Version/s: (was: 0.10.0.0)

> clientId is not unique in producer/consumer registration leads to mbean 
> warning
> ---
>
> Key: KAFKA-3278
> URL: https://issues.apache.org/jira/browse/KAFKA-3278
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
> Environment: Mac OS
>Reporter: Tom Dearman
>Assignee: Tom Dearman
>Priority: Minor
> Fix For: 0.10.0.0
>
>
> The clientId passed through to StreamThread is not unique and this is used to 
> create consumers and producers, which in turn try to register mbeans, this 
> leads to a warn that mbean already registered.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3060) Refactor MeteredXXStore

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3060:
---
Affects Version/s: (was: 0.10.0.0)

> Refactor MeteredXXStore
> ---
>
> Key: KAFKA-3060
> URL: https://issues.apache.org/jira/browse/KAFKA-3060
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Yasuhiro Matsuda
>Assignee: Guozhang Wang
>Priority: Minor
> Fix For: 0.10.0.0
>
>
> ** copied from a github comment by Guozhang Wang **
> The original motivation of having the MeteredXXStore is to wrap all metrics / 
> logging semantics into one place so they do not need to be re-implemented 
> again, but this seems to be an obstacle with the current pattern now, for 
> example MeteredWindowStore.putAndReturnInternalKey is only used for logging, 
> and MeteredWindowStore.putInternal / MeteredWindowStore.getInternal are never 
> used since only its inner will trigger this function. So how about 
> refactoring this piece as follows:
> 1. WindowStore only expose two APIs: put(K, V) and get(K, long).
> 2. Add a RollingRocksDBStores that does not extend any interface, but only 
> implements putInternal, getInternal and putAndReturnInternalKey that uses 
> underlying RocksDBStore as Segments.
> 3. RocksDBWindowStore implements WindowStore with an RollingRocksDBStores 
> inner.
> 4. Let MeteredXXStore only maintain the metrics recording logic, and let 
> different stores implement their own logging logic, since this is now 
> different across different types and are better handled separately. Also some 
> types of stores may not even have a loggingEnabled flag, if it will always 
> log, or will never log.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-2984) KTable should send old values along with new values to downstreams

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2984?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-2984:
---
Affects Version/s: (was: 0.10.0.0)

> KTable should send old values along with new values to downstreams
> --
>
> Key: KAFKA-2984
> URL: https://issues.apache.org/jira/browse/KAFKA-2984
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Yasuhiro Matsuda
>Assignee: Yasuhiro Matsuda
> Fix For: 0.10.0.0
>
>
> Old values are necessary for implementing aggregate functions. KTable should 
> augment an event with its old value. Basically KTable stream is a stream of 
> (key, (new value, old value)) internally. The old value may be omitted when 
> it is not used in the topology.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3060) Refactor MeteredXXStore

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3060:
---
Affects Version/s: (was: 0.9.0.1)
   0.10.0.0

> Refactor MeteredXXStore
> ---
>
> Key: KAFKA-3060
> URL: https://issues.apache.org/jira/browse/KAFKA-3060
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yasuhiro Matsuda
>Assignee: Guozhang Wang
>Priority: Minor
> Fix For: 0.10.0.0
>
>
> ** copied from a github comment by Guozhang Wang **
> The original motivation of having the MeteredXXStore is to wrap all metrics / 
> logging semantics into one place so they do not need to be re-implemented 
> again, but this seems to be an obstacle with the current pattern now, for 
> example MeteredWindowStore.putAndReturnInternalKey is only used for logging, 
> and MeteredWindowStore.putInternal / MeteredWindowStore.getInternal are never 
> used since only its inner will trigger this function. So how about 
> refactoring this piece as follows:
> 1. WindowStore only expose two APIs: put(K, V) and get(K, long).
> 2. Add a RollingRocksDBStores that does not extend any interface, but only 
> implements putInternal, getInternal and putAndReturnInternalKey that uses 
> underlying RocksDBStore as Segments.
> 3. RocksDBWindowStore implements WindowStore with an RollingRocksDBStores 
> inner.
> 4. Let MeteredXXStore only maintain the metrics recording logic, and let 
> different stores implement their own logging logic, since this is now 
> different across different types and are better handled separately. Also some 
> types of stores may not even have a loggingEnabled flag, if it will always 
> log, or will never log.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3278) clientId is not unique in producer/consumer registration leads to mbean warning

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3278?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3278:
---
Affects Version/s: (was: 0.9.0.1)
   0.10.0.0

> clientId is not unique in producer/consumer registration leads to mbean 
> warning
> ---
>
> Key: KAFKA-3278
> URL: https://issues.apache.org/jira/browse/KAFKA-3278
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.0.0
> Environment: Mac OS
>Reporter: Tom Dearman
>Assignee: Tom Dearman
>Priority: Minor
> Fix For: 0.10.0.0
>
>
> The clientId passed through to StreamThread is not unique and this is used to 
> create consumers and producers, which in turn try to register mbeans, this 
> leads to a warn that mbean already registered.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3395) prefix job id to internal topic names

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3395:
---
Affects Version/s: (was: 0.9.0.1)
   0.10.0.0

> prefix job id to internal topic names
> -
>
> Key: KAFKA-3395
> URL: https://issues.apache.org/jira/browse/KAFKA-3395
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Yasuhiro Matsuda
>Assignee: Yasuhiro Matsuda
> Fix For: 0.10.0.0
>
>
> Names of internal repartition topics are not prefixed by a job id right now.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3537) Provide access to low-level Metrics in ProcessorContext

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3537:
---
Affects Version/s: 0.10.0.0
   0.10.0.1
   0.10.1.0
   0.10.1.1

> Provide access to low-level Metrics in ProcessorContext
> ---
>
> Key: KAFKA-3537
> URL: https://issues.apache.org/jira/browse/KAFKA-3537
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
>Reporter: Michael Coon
>Assignee: Eno Thereska
>Priority: Minor
>  Labels: semantics
> Fix For: 0.10.2.0
>
>
> It would be good to have access to the underlying Metrics component in 
> StreamMetrics. StreamMetrics forces a naming convention for metrics that does 
> not fit our use case for reporting. We need to be able to convert the stream 
> metrics to our own metrics formatting and it's cumbersome to extract group/op 
> names from pre-formatted strings the way they are setup in StreamMetricsImpl. 
> If there were a "metrics()" method of StreamMetrics to give me the underlying 
> Metrics object, I could register my own sensors/metrics as needed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3537) Provide access to low-level Metrics in ProcessorContext

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3537?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3537:
---
Affects Version/s: (was: 0.9.0.1)

> Provide access to low-level Metrics in ProcessorContext
> ---
>
> Key: KAFKA-3537
> URL: https://issues.apache.org/jira/browse/KAFKA-3537
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1
>Reporter: Michael Coon
>Assignee: Eno Thereska
>Priority: Minor
>  Labels: semantics
> Fix For: 0.10.2.0
>
>
> It would be good to have access to the underlying Metrics component in 
> StreamMetrics. StreamMetrics forces a naming convention for metrics that does 
> not fit our use case for reporting. We need to be able to convert the stream 
> metrics to our own metrics formatting and it's cumbersome to extract group/op 
> names from pre-formatted strings the way they are setup in StreamMetricsImpl. 
> If there were a "metrics()" method of StreamMetrics to give me the underlying 
> Metrics object, I could register my own sensors/metrics as needed.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-3535) Add metrics ability for streams serde components

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3535:
---
Affects Version/s: (was: 0.9.0.1)
   0.10.0.0
   0.10.0.1
   0.10.1.0
   0.10.1.1
   0.10.2.0

> Add metrics ability for streams serde components
> 
>
> Key: KAFKA-3535
> URL: https://issues.apache.org/jira/browse/KAFKA-3535
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1, 0.10.2.0
>Reporter: Michael Coon
>Priority: Minor
>  Labels: user-experience
>
> Add the ability to record metrics in the serializer/deserializer components. 
> As it stands, I cannot record latency/sensor metrics since the API does not 
> provide the context at the serde levels. Exposing the ProcessorContext at 
> this level may not be the solution; but perhaps change the configure method 
> to take a different config or init context and make the StreamMetrics 
> available in that context along with config information.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [DISCUSS] KIP-82 - Add Record Headers

2017-02-21 Thread Jason Gustafson
Hey Michael,

Awesome. I have a minor request. The APIs are currently documented as a
wiki list. Would you mind adding a code snippet instead? It's a bit easier
to process.

How will be best to manage this, as we will obviously build off your KIP’s
> protocol changes, to avoid a merge hell, should we branch from your branch
> in the confluent repo or is it worth having a KIP-98 special branch in the
> apache git, that we can branch/fork from?


I was thinking about this also. Ideally we'd like to get the changes in as
close together as possible since we only want one magic bump and some users
deploy trunk. The level of effort to change the format for headers seems
not too high. Do you agree? My guess is that the KIP-98 message format
patch will take 2-3 weeks to review before we merge to trunk, so you could
hold off implementing until that patch has somewhat stabilized. That would
save some potential rebase pain.

-Jason


[jira] [Updated] (KAFKA-3534) Deserialize on demand when default time extractor used

2017-02-21 Thread Matthias J. Sax (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-3534:
---
Affects Version/s: (was: 0.9.0.1)
   0.10.0.0
   0.10.0.1
   0.10.1.0
   0.10.1.1
   0.10.2.0

> Deserialize on demand when default time extractor used
> --
>
> Key: KAFKA-3534
> URL: https://issues.apache.org/jira/browse/KAFKA-3534
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1, 0.10.2.0
>Reporter: Michael Coon
>Priority: Minor
>  Labels: performance
>
> When records are added to the RecordQueue, they are deserialized at that time 
> in order to extract the timestamp. But for some data flows where large 
> messages are consumed (particularly compressed messages), this can result in 
> large spikes in memory as all messages must be deserialized prior to 
> processing (and getting out of memory). An optimization might be to only 
> require deserialization at this stage if a non-default timestamp extractor is 
> being used.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: Kafka Connect / Access to OffsetStorageReader from SourceConnector

2017-02-21 Thread Jason Gustafson
Hey Florian,

It seems reasonable to me to let the connector track task progress through
offsets. I recall there have been other use cases for communication between
tasks and connectors (perhaps Ewen or someone else will jump in here and
mention them), so I'm not sure if there if this could fall under a more
general solution. Using the offsets topic has the advantage of simplicity
since it doesn't add anything new. Perhaps start with a JIRA describing the
use case and see if anyone has additional feedback?

-Jason

On Mon, Feb 20, 2017 at 3:24 AM, Florian Hussonnois 
wrote:

> Hi Jason,
>
> Yes, this is the idea. The connector assigns a subset of files to each
> task.
>
> A task stores the size of file, the bytes offset and the bytes size of the
> last sent record as a source offsets.
> A file is finished when recordBytesOffsets + recordBytesSize =
> fileBytesSize.
>
> The connector should be able to start a thread in background to track
> offsets for each assigned file.
> When all tasks has finished the connector can stop tasks or assigned new
> files by requesting tasks reconfiguration.
>
> Another advantage of monitoring source offsets from the connector is detect
> slow or failed tasks and if necessary to be able to restart all tasks.
>
> Thanks,
>
> 2017-02-18 6:47 GMT+01:00 Jason Gustafson :
>
> > Hey Florian,
> >
> > Can you explain a bit more how having access to the offset storage from
> the
> > connector helps in your use case? I guess you are planning to use offsets
> > to be able to tell when a task has finished a file?
> >
> > Thanks,
> > Jason
> >
> > On Fri, Feb 17, 2017 at 4:45 AM, Florian Hussonnois <
> fhussonn...@gmail.com
> > >
> > wrote:
> >
> > > Hi Kafka Team,
> > >
> > > I'm developping a connector which need to monitor the progress of its
> > tasks
> > > in order to be able to request a tasks reconfiguration in some
> > situations.
> > >
> > > Our connector is pretty simple. It's used to stream a thousands of
> files
> > > into Kafka. The connector scans directories then schedules each task
> > with a
> > > set of assigned files.
> > > When tasks are no longer required or new files are detected the
> connector
> > > requests a reconfiguration.
> > >
> > > In addition, files are store into a shared storage which is accessible
> > from
> > > each connect worker. In that way, we can distribute file streaming.
> > >
> > > For that prupose, it would be very convenient to have access to an
> > > offsetStorageReader instance from either the Connector class or the
> > > ConnectorContext class.
> > >
> > > I found a similar question:
> > > https://www.mail-archive.com/dev@kafka.apache.org/msg50579.html
> > >
> > > Do you think this improvement could be considered ? I can contribute to
> > it.
> > >
> > > Thanks,
> > >
> > > --
> > > Florian HUSSONNOIS
> > >
> >
>
>
>
> --
> Florian HUSSONNOIS
>


Re: [VOTE] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-02-21 Thread Michael Pearce
If the argument and objective within this KIP is to keep the overhead of the 
protocol as small as possible and remove redundancy, and every byte is being 
counted and the introduction of varInts, then it would make sense to use 
attributes to me.


On 22/02/2017, 00:14, "Jason Gustafson"  wrote:

Done. I've left the key and value as optional since we may not have reached
consensus on whether to use attributes or not. Perhaps we should just keep
it simple and not do it? The benefit seems small.

-Jason

On Tue, Feb 21, 2017 at 4:05 PM, Michael Pearce 
wrote:

> Ok, no worries, can you add it back ValueLen on this KIP, and update the
> doc, then we can work from that ☺
>
> Cheers
> Mike
>
> On 22/02/2017, 00:02, "Jason Gustafson"  wrote:
>
> I feel it was a little odd to leave out the value length anyway, so I
> would
> rather add it back and put headers at the end. This is more consistent
> with
> the rest of the Kafka protocol.
>
> -Jason
>
> On Tue, Feb 21, 2017 at 3:58 PM, Michael Pearce  >
> wrote:
>
> > Or we keep as is (valuelen removed), and headers are added with
> headers
> > length..
> >
> > On 21/02/2017, 23:38, "Apurva Mehta"  wrote:
> >
> > Right now, we don't need the value length: since it is the last
> item
> > in the
> > message, and we have the message length, we can deduce the value
> > length.
> > However, if we are adding record headers to the end, we would
> need to
> > introduce the value length along with that change.
> >
> > On Tue, Feb 21, 2017 at 3:34 PM, Michael Pearce <
> michael.pea...@ig.com
> > >
> > wrote:
> >
> > > It seems I cannot add comment on the doc.
> > >
> > > In the section around the message protocol.
> > >
> > > It has stated:
> > >
> > > Message =>
> > > Length => uintVar
> > > Attributes => int8
> > > TimestampDelta => intVar
> > > OffsetDelta => uintVar
> > > KeyLen => uintVar [OPTIONAL]
> > > Key => data [OPTIONAL]
> > > Value => data [OPTIONAL]
> > >
> > > Should it not be: (added missing value len)
> > >
> > > Message =>
> > > Length => uintVar
> > > Attributes => int8
> > > TimestampDelta => intVar
> > > OffsetDelta => uintVar
> > > KeyLen => uintVar [OPTIONAL]
> > > Key => data [OPTIONAL]
> > > ValueLen => uintVar [OPTIONAL]
> > > Value => data [OPTIONAL]
> > >
> > >
> > >
> > > On 21/02/2017, 23:07, "Joel Koshy" 
> wrote:
> > >
> > > I left a couple of comments/questions directly on the
> google-doc
> > >  > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8>
> > > - I found it much more tractable for a proposal of this
> size to
> > > discuss in
> > > context within the doc. The permissions on the doc don't
> let
> > everyone
> > > view
> > > comments, so if there are any material changes that come
> out of
> > the
> > > discussions in those comment threads we can summarize 
here.
> > >
> > > Thanks,
> > >
> > > Joel
> > >
> > > On Mon, Feb 20, 2017 at 4:08 PM, Becket Qin <
> > becket@gmail.com>
> > > wrote:
> > >
> > > > Thanks for the explanation, Guozhang. That makes sense.
> > > >
> > > > On Sun, Feb 19, 2017 at 7:28 PM, Guozhang Wang <
> > wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > Thanks Becket.
> > > > >
> > > > > Actually sequence is associated with a message, not a
> > message set.
> > > For
> > > > > example if a message set sent by producer contains 100
> > messages,
> > > and the
> > > > > first message's sequence is 5, then the last message's
> > sequence
> > > number
> > > > > would be 104, and the next message set's first
> sequence is
> > > expected to be
> > > > > 105.
> > > > >
> > > > >
> > > > > Guozhang
>

Re: [DISCUSS] KIP-124: Request rate quotas

2017-02-21 Thread Jun Rao
Hi, Rajini,

Thanks for the proposal.

The benefit of using the request processing time over the request rate is
exactly what people have said. I will just expand that a bit. Consider the
following case. The producer sends a produce request with a 10MB message
but compressed to 100KB with gzip. The decompression of the message on the
broker could take 10-15 seconds, during which time, a request handler
thread is completely blocked. In this case, neither the byte-in quota nor
the request rate quota may be effective in protecting the broker. Consider
another case. A consumer group starts with 10 instances and later on
switches to 20 instances. The request rate will likely double, but the
actually load on the broker may not double since each fetch request only
contains half of the partitions. Request rate quota may not be easy to
configure in this case.

What we really want is to be able to prevent a client from using too much
of the server side resources. In this particular KIP, this resource is the
capacity of the request handler threads. I agree that it may not be
intuitive for the users to determine how to set the right limit. However,
this is not completely new and has been done in the container world
already. For example, Linux cgroup (https://access.redhat.com/
documentation/en-US/Red_Hat_Enterprise_Linux/6/html/
Resource_Management_Guide/sec-cpu.html) has the concept of cpu.cfs_quota_us,
which specifies the total amount of time in microseconds for which all
tasks in a cgroup can run during a one second period. We can potentially
model the request handler threads in a similar way. For example, each
request handler thread can be 1 request handler unit and the admin can
configure a limit on how many units (say 0.01) a client can have.

Regarding not throttling the internal broker to broker requests. We could
do that. Alternatively, we could just let the admin configure a high limit
for the kafka user (it may not be able to do that easily based on clientId
though).

Ideally we want to be able to protect the utilization of the network thread
pool too. The difficult is mostly what Rajini said: (1) The mechanism for
throttling the requests is through Purgatory and we will have to think
through how to integrate that into the network layer.  (2) In the network
layer, currently we know the user, but not the clientId of the request. So,
it's a bit tricky to throttle based on clientId there. Plus, the byteOut
quota can already protect the network thread utilization for fetch
requests. So, if we can't figure out this part right now, just focusing on
the request handling threads for this KIP is still a useful feature.

Thanks,

Jun


On Tue, Feb 21, 2017 at 4:27 AM, Rajini Sivaram 
wrote:

> Thank you all for the feedback.
>
> Jay: I have removed exemption for consumer heartbeat etc. Agree that
> protecting the cluster is more important than protecting individual apps.
> Have retained the exemption for StopReplicat/LeaderAndIsr etc, these are
> throttled only if authorization fails (so can't be used for DoS attacks in
> a secure cluster, but allows inter-broker requests to complete without
> delays).
>
> I will wait another day to see if these is any objection to quotas based on
> request processing time (as opposed to request rate) and if there are no
> objections, I will revert to the original proposal with some changes.
>
> The original proposal was only including the time used by the request
> handler threads (that made calculation easy). I think the suggestion is to
> include the time spent in the network threads as well since that may be
> significant. As Jay pointed out, it is more complicated to calculate the
> total available CPU time and convert to a ratio when there *m* I/O threads
> and *n* network threads. ThreadMXBean#getThreadCPUTime() may give us what
> we want, but it can be very expensive on some platforms. As Becket and
> Guozhang have pointed out, we do have several time measurements already for
> generating metrics that we could use, though we might want to switch to
> nanoTime() instead of currentTimeMillis() since some of the values for
> small requests may be < 1ms. But rather than add up the time spent in I/O
> thread and network thread, wouldn't it be better to convert the time spent
> on each thread into a separate ratio? UserA has a request quota of 5%. Can
> we take that to mean that UserA can use 5% of the time on network threads
> and 5% of the time on I/O threads? If either is exceeded, the response is
> throttled - it would mean maintaining two sets of metrics for the two
> durations, but would result in more meaningful ratios. We could define two
> quota limits (UserA has 5% of request threads and 10% of network threads),
> but that seems unnecessary and harder to explain to users.
>
> Back to why and how quotas are applied to network thread utilization:
> a) In the case of fetch,  the time spent in the network thread may be
> significant and I can see the 

[jira] [Updated] (KAFKA-2273) KIP-54: Add rebalance with a minimal number of reassignments to server-defined strategy list

2017-02-21 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-2273:
---
Summary: KIP-54: Add rebalance with a minimal number of reassignments to 
server-defined strategy list  (was: Kip-54: Add rebalance with a minimal number 
of reassignments to server-defined strategy list)

> KIP-54: Add rebalance with a minimal number of reassignments to 
> server-defined strategy list
> 
>
> Key: KAFKA-2273
> URL: https://issues.apache.org/jira/browse/KAFKA-2273
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Olof Johansson
>Assignee: Vahid Hashemian
>  Labels: kip
>
> Add a new partitions.assignment.strategy to the server-defined list that will 
> do reassignments based on moving as few partitions as possible. This should 
> be a quite common reassignment strategy especially for the cases where the 
> consumer has to maintain state, either in memory, or on disk.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-2273) Kip-54: Add rebalance with a minimal number of reassignments to server-defined strategy list

2017-02-21 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-2273:
---
Summary: Kip-54: Add rebalance with a minimal number of reassignments to 
server-defined strategy list  (was: Add rebalance with a minimal number of 
reassignments to server-defined strategy list)

> Kip-54: Add rebalance with a minimal number of reassignments to 
> server-defined strategy list
> 
>
> Key: KAFKA-2273
> URL: https://issues.apache.org/jira/browse/KAFKA-2273
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Olof Johansson
>Assignee: Vahid Hashemian
>  Labels: kip
>
> Add a new partitions.assignment.strategy to the server-defined list that will 
> do reassignments based on moving as few partitions as possible. This should 
> be a quite common reassignment strategy especially for the cases where the 
> consumer has to maintain state, either in memory, or on disk.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-2273) Add rebalance with a minimal number of reassignments to server-defined strategy list

2017-02-21 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-2273:
---
Status: Patch Available  (was: In Progress)

> Add rebalance with a minimal number of reassignments to server-defined 
> strategy list
> 
>
> Key: KAFKA-2273
> URL: https://issues.apache.org/jira/browse/KAFKA-2273
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Olof Johansson
>Assignee: Vahid Hashemian
>  Labels: kip
>
> Add a new partitions.assignment.strategy to the server-defined list that will 
> do reassignments based on moving as few partitions as possible. This should 
> be a quite common reassignment strategy especially for the cases where the 
> consumer has to maintain state, either in memory, or on disk.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-2273) Add rebalance with a minimal number of reassignments to server-defined strategy list

2017-02-21 Thread Vahid Hashemian (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vahid Hashemian updated KAFKA-2273:
---
Labels: kip  (was: kip newbie++ newbiee)

> Add rebalance with a minimal number of reassignments to server-defined 
> strategy list
> 
>
> Key: KAFKA-2273
> URL: https://issues.apache.org/jira/browse/KAFKA-2273
> Project: Kafka
>  Issue Type: Sub-task
>  Components: consumer
>Reporter: Olof Johansson
>Assignee: Vahid Hashemian
>  Labels: kip
>
> Add a new partitions.assignment.strategy to the server-defined list that will 
> do reassignments based on moving as few partitions as possible. This should 
> be a quite common reassignment strategy especially for the cases where the 
> consumer has to maintain state, either in memory, or on disk.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


Re: [VOTE] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-02-21 Thread Jason Gustafson
Done. I've left the key and value as optional since we may not have reached
consensus on whether to use attributes or not. Perhaps we should just keep
it simple and not do it? The benefit seems small.

-Jason

On Tue, Feb 21, 2017 at 4:05 PM, Michael Pearce 
wrote:

> Ok, no worries, can you add it back ValueLen on this KIP, and update the
> doc, then we can work from that ☺
>
> Cheers
> Mike
>
> On 22/02/2017, 00:02, "Jason Gustafson"  wrote:
>
> I feel it was a little odd to leave out the value length anyway, so I
> would
> rather add it back and put headers at the end. This is more consistent
> with
> the rest of the Kafka protocol.
>
> -Jason
>
> On Tue, Feb 21, 2017 at 3:58 PM, Michael Pearce  >
> wrote:
>
> > Or we keep as is (valuelen removed), and headers are added with
> headers
> > length..
> >
> > On 21/02/2017, 23:38, "Apurva Mehta"  wrote:
> >
> > Right now, we don't need the value length: since it is the last
> item
> > in the
> > message, and we have the message length, we can deduce the value
> > length.
> > However, if we are adding record headers to the end, we would
> need to
> > introduce the value length along with that change.
> >
> > On Tue, Feb 21, 2017 at 3:34 PM, Michael Pearce <
> michael.pea...@ig.com
> > >
> > wrote:
> >
> > > It seems I cannot add comment on the doc.
> > >
> > > In the section around the message protocol.
> > >
> > > It has stated:
> > >
> > > Message =>
> > > Length => uintVar
> > > Attributes => int8
> > > TimestampDelta => intVar
> > > OffsetDelta => uintVar
> > > KeyLen => uintVar [OPTIONAL]
> > > Key => data [OPTIONAL]
> > > Value => data [OPTIONAL]
> > >
> > > Should it not be: (added missing value len)
> > >
> > > Message =>
> > > Length => uintVar
> > > Attributes => int8
> > > TimestampDelta => intVar
> > > OffsetDelta => uintVar
> > > KeyLen => uintVar [OPTIONAL]
> > > Key => data [OPTIONAL]
> > > ValueLen => uintVar [OPTIONAL]
> > > Value => data [OPTIONAL]
> > >
> > >
> > >
> > > On 21/02/2017, 23:07, "Joel Koshy" 
> wrote:
> > >
> > > I left a couple of comments/questions directly on the
> google-doc
> > >  > > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8>
> > > - I found it much more tractable for a proposal of this
> size to
> > > discuss in
> > > context within the doc. The permissions on the doc don't
> let
> > everyone
> > > view
> > > comments, so if there are any material changes that come
> out of
> > the
> > > discussions in those comment threads we can summarize here.
> > >
> > > Thanks,
> > >
> > > Joel
> > >
> > > On Mon, Feb 20, 2017 at 4:08 PM, Becket Qin <
> > becket@gmail.com>
> > > wrote:
> > >
> > > > Thanks for the explanation, Guozhang. That makes sense.
> > > >
> > > > On Sun, Feb 19, 2017 at 7:28 PM, Guozhang Wang <
> > wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > Thanks Becket.
> > > > >
> > > > > Actually sequence is associated with a message, not a
> > message set.
> > > For
> > > > > example if a message set sent by producer contains 100
> > messages,
> > > and the
> > > > > first message's sequence is 5, then the last message's
> > sequence
> > > number
> > > > > would be 104, and the next message set's first
> sequence is
> > > expected to be
> > > > > 105.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Sun, Feb 19, 2017 at 4:48 PM, Becket Qin <
> > becket@gmail.com>
> > > > wrote:
> > > > >
> > > > > > +1. Thanks for the great work on the KIP!
> > > > > >
> > > > > > I have only one minor question, in the wiki (and the
> doc)
> > the new
> > > > message
> > > > > > set format has a "FirstSequence" field, should it
> just be
> > > "Sequence" if
> > > > > the
> > > > > > sequence is always associated with a message set?
> > > > > >
> > > > > > On Fri, Feb 17, 2017 at 3:28 AM, Michael Pearce <
> > > michael.pea...@ig.com
> > > > >
> > > > > > wrote:
> > > > > >

Re: [VOTE] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-02-21 Thread Michael Pearce
Ok, no worries, can you add it back ValueLen on this KIP, and update the doc, 
then we can work from that ☺

Cheers
Mike

On 22/02/2017, 00:02, "Jason Gustafson"  wrote:

I feel it was a little odd to leave out the value length anyway, so I would
rather add it back and put headers at the end. This is more consistent with
the rest of the Kafka protocol.

-Jason

On Tue, Feb 21, 2017 at 3:58 PM, Michael Pearce 
wrote:

> Or we keep as is (valuelen removed), and headers are added with headers
> length..
>
> On 21/02/2017, 23:38, "Apurva Mehta"  wrote:
>
> Right now, we don't need the value length: since it is the last item
> in the
> message, and we have the message length, we can deduce the value
> length.
> However, if we are adding record headers to the end, we would need to
> introduce the value length along with that change.
>
> On Tue, Feb 21, 2017 at 3:34 PM, Michael Pearce  >
> wrote:
>
> > It seems I cannot add comment on the doc.
> >
> > In the section around the message protocol.
> >
> > It has stated:
> >
> > Message =>
> > Length => uintVar
> > Attributes => int8
> > TimestampDelta => intVar
> > OffsetDelta => uintVar
> > KeyLen => uintVar [OPTIONAL]
> > Key => data [OPTIONAL]
> > Value => data [OPTIONAL]
> >
> > Should it not be: (added missing value len)
> >
> > Message =>
> > Length => uintVar
> > Attributes => int8
> > TimestampDelta => intVar
> > OffsetDelta => uintVar
> > KeyLen => uintVar [OPTIONAL]
> > Key => data [OPTIONAL]
> > ValueLen => uintVar [OPTIONAL]
> > Value => data [OPTIONAL]
> >
> >
> >
> > On 21/02/2017, 23:07, "Joel Koshy"  wrote:
> >
> > I left a couple of comments/questions directly on the google-doc
> >  > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8>
> > - I found it much more tractable for a proposal of this size to
> > discuss in
> > context within the doc. The permissions on the doc don't let
> everyone
> > view
> > comments, so if there are any material changes that come out of
> the
> > discussions in those comment threads we can summarize here.
> >
> > Thanks,
> >
> > Joel
> >
> > On Mon, Feb 20, 2017 at 4:08 PM, Becket Qin <
> becket@gmail.com>
> > wrote:
> >
> > > Thanks for the explanation, Guozhang. That makes sense.
> > >
> > > On Sun, Feb 19, 2017 at 7:28 PM, Guozhang Wang <
> wangg...@gmail.com>
> > wrote:
> > >
> > > > Thanks Becket.
> > > >
> > > > Actually sequence is associated with a message, not a
> message set.
> > For
> > > > example if a message set sent by producer contains 100
> messages,
> > and the
> > > > first message's sequence is 5, then the last message's
> sequence
> > number
> > > > would be 104, and the next message set's first sequence is
> > expected to be
> > > > 105.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Sun, Feb 19, 2017 at 4:48 PM, Becket Qin <
> becket@gmail.com>
> > > wrote:
> > > >
> > > > > +1. Thanks for the great work on the KIP!
> > > > >
> > > > > I have only one minor question, in the wiki (and the doc)
> the new
> > > message
> > > > > set format has a "FirstSequence" field, should it just be
> > "Sequence" if
> > > > the
> > > > > sequence is always associated with a message set?
> > > > >
> > > > > On Fri, Feb 17, 2017 at 3:28 AM, Michael Pearce <
> > michael.pea...@ig.com
> > > >
> > > > > wrote:
> > > > >
> > > > > > +0
> > > > > >
> > > > > > I think need some unified agreement on the VarInts.
> > > > > >
> > > > > > Would this also change in all other area’s of the
> protocol,
> > e.g.
> > > value
> > > > > and
> > > > > > key length in message protocol, to keep this uniform
> across all
> > > > protocols
> > > > > > going forwards?
> > > > > >
> > > > > >
> > > > > >
> > > > > > On 

Re: [VOTE] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-02-21 Thread Jason Gustafson
I feel it was a little odd to leave out the value length anyway, so I would
rather add it back and put headers at the end. This is more consistent with
the rest of the Kafka protocol.

-Jason

On Tue, Feb 21, 2017 at 3:58 PM, Michael Pearce 
wrote:

> Or we keep as is (valuelen removed), and headers are added with headers
> length..
>
> On 21/02/2017, 23:38, "Apurva Mehta"  wrote:
>
> Right now, we don't need the value length: since it is the last item
> in the
> message, and we have the message length, we can deduce the value
> length.
> However, if we are adding record headers to the end, we would need to
> introduce the value length along with that change.
>
> On Tue, Feb 21, 2017 at 3:34 PM, Michael Pearce  >
> wrote:
>
> > It seems I cannot add comment on the doc.
> >
> > In the section around the message protocol.
> >
> > It has stated:
> >
> > Message =>
> > Length => uintVar
> > Attributes => int8
> > TimestampDelta => intVar
> > OffsetDelta => uintVar
> > KeyLen => uintVar [OPTIONAL]
> > Key => data [OPTIONAL]
> > Value => data [OPTIONAL]
> >
> > Should it not be: (added missing value len)
> >
> > Message =>
> > Length => uintVar
> > Attributes => int8
> > TimestampDelta => intVar
> > OffsetDelta => uintVar
> > KeyLen => uintVar [OPTIONAL]
> > Key => data [OPTIONAL]
> > ValueLen => uintVar [OPTIONAL]
> > Value => data [OPTIONAL]
> >
> >
> >
> > On 21/02/2017, 23:07, "Joel Koshy"  wrote:
> >
> > I left a couple of comments/questions directly on the google-doc
> >  > GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8>
> > - I found it much more tractable for a proposal of this size to
> > discuss in
> > context within the doc. The permissions on the doc don't let
> everyone
> > view
> > comments, so if there are any material changes that come out of
> the
> > discussions in those comment threads we can summarize here.
> >
> > Thanks,
> >
> > Joel
> >
> > On Mon, Feb 20, 2017 at 4:08 PM, Becket Qin <
> becket@gmail.com>
> > wrote:
> >
> > > Thanks for the explanation, Guozhang. That makes sense.
> > >
> > > On Sun, Feb 19, 2017 at 7:28 PM, Guozhang Wang <
> wangg...@gmail.com>
> > wrote:
> > >
> > > > Thanks Becket.
> > > >
> > > > Actually sequence is associated with a message, not a
> message set.
> > For
> > > > example if a message set sent by producer contains 100
> messages,
> > and the
> > > > first message's sequence is 5, then the last message's
> sequence
> > number
> > > > would be 104, and the next message set's first sequence is
> > expected to be
> > > > 105.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Sun, Feb 19, 2017 at 4:48 PM, Becket Qin <
> becket@gmail.com>
> > > wrote:
> > > >
> > > > > +1. Thanks for the great work on the KIP!
> > > > >
> > > > > I have only one minor question, in the wiki (and the doc)
> the new
> > > message
> > > > > set format has a "FirstSequence" field, should it just be
> > "Sequence" if
> > > > the
> > > > > sequence is always associated with a message set?
> > > > >
> > > > > On Fri, Feb 17, 2017 at 3:28 AM, Michael Pearce <
> > michael.pea...@ig.com
> > > >
> > > > > wrote:
> > > > >
> > > > > > +0
> > > > > >
> > > > > > I think need some unified agreement on the VarInts.
> > > > > >
> > > > > > Would this also change in all other area’s of the
> protocol,
> > e.g.
> > > value
> > > > > and
> > > > > > key length in message protocol, to keep this uniform
> across all
> > > > protocols
> > > > > > going forwards?
> > > > > >
> > > > > >
> > > > > >
> > > > > > On 17/02/2017, 00:23, "Apurva Mehta" <
> apu...@confluent.io>
> > wrote:
> > > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > Thanks for the reply. Comments inline.
> > > > > >
> > > > > > On Thu, Feb 16, 2017 at 2:29 PM, Jun Rao <
> j...@confluent.io
> > >
> > > wrote:
> > > > > >
> > > > > > > Hi, Apurva,
> > > > > > >
> > > > > > > Thanks for the reply. A couple of comment below.
> > > > > > >
> > > > > > > On Wed, Feb 15, 2017 at 9:45 PM, Apurva Mehta <
> > > > apu...@confluent.io
> > > > > >
> > > > > > wrote:
> > > > > > >
> > 

Re: [VOTE] KIP-98: Exactly Once Delivery and Transactional Messaging

2017-02-21 Thread Michael Pearce
Or we keep as is (valuelen removed), and headers are added with headers length..

On 21/02/2017, 23:38, "Apurva Mehta"  wrote:

Right now, we don't need the value length: since it is the last item in the
message, and we have the message length, we can deduce the value length.
However, if we are adding record headers to the end, we would need to
introduce the value length along with that change.

On Tue, Feb 21, 2017 at 3:34 PM, Michael Pearce 
wrote:

> It seems I cannot add comment on the doc.
>
> In the section around the message protocol.
>
> It has stated:
>
> Message =>
> Length => uintVar
> Attributes => int8
> TimestampDelta => intVar
> OffsetDelta => uintVar
> KeyLen => uintVar [OPTIONAL]
> Key => data [OPTIONAL]
> Value => data [OPTIONAL]
>
> Should it not be: (added missing value len)
>
> Message =>
> Length => uintVar
> Attributes => int8
> TimestampDelta => intVar
> OffsetDelta => uintVar
> KeyLen => uintVar [OPTIONAL]
> Key => data [OPTIONAL]
> ValueLen => uintVar [OPTIONAL]
> Value => data [OPTIONAL]
>
>
>
> On 21/02/2017, 23:07, "Joel Koshy"  wrote:
>
> I left a couple of comments/questions directly on the google-doc
>  GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8>
> - I found it much more tractable for a proposal of this size to
> discuss in
> context within the doc. The permissions on the doc don't let everyone
> view
> comments, so if there are any material changes that come out of the
> discussions in those comment threads we can summarize here.
>
> Thanks,
>
> Joel
>
> On Mon, Feb 20, 2017 at 4:08 PM, Becket Qin 
> wrote:
>
> > Thanks for the explanation, Guozhang. That makes sense.
> >
> > On Sun, Feb 19, 2017 at 7:28 PM, Guozhang Wang 
> wrote:
> >
> > > Thanks Becket.
> > >
> > > Actually sequence is associated with a message, not a message set.
> For
> > > example if a message set sent by producer contains 100 messages,
> and the
> > > first message's sequence is 5, then the last message's sequence
> number
> > > would be 104, and the next message set's first sequence is
> expected to be
> > > 105.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Sun, Feb 19, 2017 at 4:48 PM, Becket Qin 
> > wrote:
> > >
> > > > +1. Thanks for the great work on the KIP!
> > > >
> > > > I have only one minor question, in the wiki (and the doc) the 
new
> > message
> > > > set format has a "FirstSequence" field, should it just be
> "Sequence" if
> > > the
> > > > sequence is always associated with a message set?
> > > >
> > > > On Fri, Feb 17, 2017 at 3:28 AM, Michael Pearce <
> michael.pea...@ig.com
> > >
> > > > wrote:
> > > >
> > > > > +0
> > > > >
> > > > > I think need some unified agreement on the VarInts.
> > > > >
> > > > > Would this also change in all other area’s of the protocol,
> e.g.
> > value
> > > > and
> > > > > key length in message protocol, to keep this uniform across 
all
> > > protocols
> > > > > going forwards?
> > > > >
> > > > >
> > > > >
> > > > > On 17/02/2017, 00:23, "Apurva Mehta" 
> wrote:
> > > > >
> > > > > Hi Jun,
> > > > >
> > > > > Thanks for the reply. Comments inline.
> > > > >
> > > > > On Thu, Feb 16, 2017 at 2:29 PM, Jun Rao 
 >
> > wrote:
> > > > >
> > > > > > Hi, Apurva,
> > > > > >
> > > > > > Thanks for the reply. A couple of comment below.
> > > > > >
> > > > > > On Wed, Feb 15, 2017 at 9:45 PM, Apurva Mehta <
> > > apu...@confluent.io
> > > > >
> > > > > wrote:
> > > > > >
> > > > > > > Hi Jun,
> > > > > > >
> > > > > > > Answers inline:
> > > > > > >
> > > > > > > 210. Pid snapshots: Is the number of pid snapshot
> > configurable
> > > or
> > > > > > hardcoded
> > > > > > > > with 2? When do we decide to roll a new snapshot?
> Based on
> > > > time,
> > > > > byte,
> > > > > > or
> > > > > > > > offset? Is that configurable too?
> > > > > > > >
> > > > > >
> > > > >

  1   2   >