[jira] [Updated] (KAFKA-8343) streams application crashed due to rocksdb

2019-05-08 Thread gaoshu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

gaoshu updated KAFKA-8343:
--
Attachment: fullsizeoutput_6.jpeg

> streams application crashed due to rocksdb
> --
>
> Key: KAFKA-8343
> URL: https://issues.apache.org/jira/browse/KAFKA-8343
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
> Environment: centos 7 jdk8 kafka-streams1.0
>Reporter: gaoshu
>Priority: Major
> Attachments: fullsizeoutput_6.jpeg
>
>
> my streams application always crashed in few days.  The crash log looks like 
> [https://github.com/facebook/rocksdb/issues/5234|[https://github.com/facebook/rocksdb/issues/5234].]
>   so I think it may because of RocksDBStore.java closed incorrectly in 
> multithread.  I look through the below code,  it means the db.close()  should 
> after openiterators.close(). However, db.close() may be executed before 
> iterators.close() due to instructions reorder. I hope my guess is correct.
> {code:java}
> // RocksDBStore.java
> @Override
> public synchronized void close() {
> if (!open) {
> return;
> }
> open = false;
> closeOpenIterators();
> options.close();
> wOptions.close();
> fOptions.close();
> db.close();
> options = null;
> wOptions = null;
> fOptions = null;
> db = null;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8343) streams application crashed due to rocksdb

2019-05-08 Thread gaoshu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8343?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

gaoshu updated KAFKA-8343:
--
Attachment: (was: fullsizeoutput_1.jpeg)

> streams application crashed due to rocksdb
> --
>
> Key: KAFKA-8343
> URL: https://issues.apache.org/jira/browse/KAFKA-8343
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
> Environment: centos 7 jdk8 kafka-streams1.0
>Reporter: gaoshu
>Priority: Major
>
> my streams application always crashed in few days.  The crash log looks like 
> [https://github.com/facebook/rocksdb/issues/5234|[https://github.com/facebook/rocksdb/issues/5234].]
>   so I think it may because of RocksDBStore.java closed incorrectly in 
> multithread.  I look through the below code,  it means the db.close()  should 
> after openiterators.close(). However, db.close() may be executed before 
> iterators.close() due to instructions reorder. I hope my guess is correct.
> {code:java}
> // RocksDBStore.java
> @Override
> public synchronized void close() {
> if (!open) {
> return;
> }
> open = false;
> closeOpenIterators();
> options.close();
> wOptions.close();
> fOptions.close();
> db.close();
> options = null;
> wOptions = null;
> fOptions = null;
> db = null;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8343) streams application crashed due to rocksdb

2019-05-08 Thread gaoshu (JIRA)
gaoshu created KAFKA-8343:
-

 Summary: streams application crashed due to rocksdb
 Key: KAFKA-8343
 URL: https://issues.apache.org/jira/browse/KAFKA-8343
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 1.0.0
 Environment: centos 7 jdk8 kafka-streams1.0
Reporter: gaoshu
 Attachments: fullsizeoutput_1.jpeg

my streams application always crashed in few days.  The crash log looks like 
[https://github.com/facebook/rocksdb/issues/5234|[https://github.com/facebook/rocksdb/issues/5234].]
  so I think it may because of RocksDBStore.java closed incorrectly in 
multithread.  I look through the below code,  it means the db.close()  should 
after openiterators.close(). However, db.close() may be executed before 
iterators.close() due to instructions reorder. I hope my guess is correct.
{code:java}
// RocksDBStore.java
@Override
public synchronized void close() {
if (!open) {
return;
}

open = false;
closeOpenIterators();
options.close();
wOptions.close();
fOptions.close();
db.close();

options = null;
wOptions = null;
fOptions = null;
db = null;
}
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets

2019-05-08 Thread Boquan Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836075#comment-16836075
 ] 

Boquan Tang edited comment on KAFKA-8335 at 5/9/19 4:56 AM:


Hi [~hachikuji] thanks for replying.
 As Weichu commented we have log.cleaner.delete.retention.ms = 8640 which 
is one day. To better illustrate the suspected issue, I uploaded the full 
segment from April 25 [^seg_april_25.zip], which is 2 weeks ago from the time 
it was retrieved.
 log dump shows not only endTxnMarker is not deleted, the record batch metadata 
is also retained:
{code:java}
Dumping /home/boquan/Downloads/Users/boquan/Documents/003530931566.log
Starting offset: 3530931566
baseOffset: 3530931566 lastOffset: 3530931567 count: 0 baseSequence: 0 
lastSequence: 1 producerId: 4001 producerEpoch: 2539 partitionLeaderEpoch: 94 
isTransactional: true isControl: false position: 0 CreateTime: 1556161832882 
size: 61 magic: 2 compresscodec: NONE crc: 1683579819 isvalid: true
baseOffset: 3530931575 lastOffset: 3530931575 count: 1 baseSequence: -1 
lastSequence: -1 producerId: 4001 producerEpoch: 2539 partitionLeaderEpoch: 94 
isTransactional: true isControl: true position: 61 CreateTime: 1556161832899 
size: 78 magic: 2 compresscodec: NONE crc: 535474521 isvalid: true
| offset: 3530931575 CreateTime: 1556161832899 keysize: 4 valuesize: 6 
sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 84
baseOffset: 3530931576 lastOffset: 3530931577 count: 0 baseSequence: 0 
lastSequence: 1 producerId: 3007 producerEpoch: 2516 partitionLeaderEpoch: 94 
isTransactional: true isControl: false position: 139 CreateTime: 1556161832997 
size: 61 magic: 2 compresscodec: NONE crc: 3760382141 isvalid: true
baseOffset: 3530931578 lastOffset: 3530931579 count: 0 baseSequence: 0 
lastSequence: 1 producerId: 1004 producerEpoch: 2576 partitionLeaderEpoch: 94 
isTransactional: true isControl: false position: 200 CreateTime: 1556161832998 
size: 61 magic: 2 compresscodec: NONE crc: 3285369041 isvalid: true
baseOffset: 3530931580 lastOffset: 3530931581 count: 0 baseSequence: 0 
lastSequence: 1 producerId: 1005 producerEpoch: 2545 partitionLeaderEpoch: 94 
isTransactional: true isControl: false position: 261 CreateTime: 1556161832998 
size: 61 magic: 2 compresscodec: NONE crc: 1698037918 isvalid: true
baseOffset: 3530931582 lastOffset: 3530931583 count: 0 baseSequence: 0 
lastSequence: 1 producerId: 3003 producerEpoch: 2529 partitionLeaderEpoch: 94 
isTransactional: true isControl: false position: 322 CreateTime: 1556161832998 
size: 61 magic: 2 compresscodec: NONE crc: 3446788505 isvalid: true
baseOffset: 3530931584 lastOffset: 3530931585 count: 0 baseSequence: 0 
lastSequence: 1 producerId: 3001 producerEpoch: 2486 partitionLeaderEpoch: 94 
isTransactional: true isControl: false position: 383 CreateTime: 1556161832998 
size: 61 magic: 2 compresscodec: NONE crc: 2245471394 isvalid: true
baseOffset: 3530931586 lastOffset: 3530931587 count: 0 baseSequence: 0 
lastSequence: 1 producerId: 3006 producerEpoch: 2503 partitionLeaderEpoch: 94 
isTransactional: true isControl: false position: 444 CreateTime: 1556161832999 
size: 61 magic: 2 compresscodec: NONE crc: 1819109301 isvalid: true
baseOffset: 3530931588 lastOffset: 3530931588 count: 1 baseSequence: -1 
lastSequence: -1 producerId: 3007 producerEpoch: 2516 partitionLeaderEpoch: 94 
isTransactional: true isControl: true position: 505 CreateTime: 1556161833001 
size: 78 magic: 2 compresscodec: NONE crc: 2403915653 isvalid: true
| offset: 3530931588 CreateTime: 1556161833001 keysize: 4 valuesize: 6 
sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 95
baseOffset: 3530931589 lastOffset: 3530931589 count: 1 baseSequence: -1 
lastSequence: -1 producerId: 3003 producerEpoch: 2529 partitionLeaderEpoch: 94 
isTransactional: true isControl: true position: 583 CreateTime: 1556161833004 
size: 78 magic: 2 compresscodec: NONE crc: 4184380477 isvalid: true
| offset: 3530931589 CreateTime: 1556161833004 keysize: 4 valuesize: 6 
sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 95

{code}
Is this intended? If so will all compact topic grow unlimited in the end?


was (Author: boquan):
Hi [~hachikuji] thanks for replying.
 As Weichu commented we have log.cleaner.delete.retention.ms = 8640 which 
is one day. To better illustrate the suspected issue, I uploaded the full 
segment from April 25 [^seg_april_25.zip], which is 2 weeks ago from the time 
it was retrieved.
 log dump shows not only endTxnMarker is not deleted, the record batch metadata 
is also retained:
{code:java}
Dumping /home/boquan/Downloads/Users/boquan/Documents/003530931566.log
Starting offset: 3530931566
baseOffset: 3530931566 lastOffset: 3530931567 count: 0 baseSequence: 0 
lastSequence: 1 producerId: 4001 producerEpoch: 2539 partitionLeaderEpoch: 94 
isTransactional: true isControl: 

[jira] [Commented] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets

2019-05-08 Thread Boquan Tang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836075#comment-16836075
 ] 

Boquan Tang commented on KAFKA-8335:


Hi [~hachikuji] thanks for replying.
 As Weichu commented we have log.cleaner.delete.retention.ms = 8640 which 
is one day. To better illustrate the suspected issue, I uploaded the full 
segment from April 25 [^seg_april_25.zip], which is 2 weeks ago from the time 
it was retrieved.
 log dump shows not only endTxnMarker is not deleted, the record batch metadata 
is also retained:
{code:java}
Dumping /home/boquan/Downloads/Users/boquan/Documents/003530931566.log
Starting offset: 3530931566
baseOffset: 3530931566 lastOffset: 3530931567 count: 0 baseSequence: 0 
lastSequence: 1 producerId: 4001 producerEpoch: 2539 partitionLeaderEpoch: 94 
isTransactional: true isControl: false position: 0 CreateTime: 1556161832882 
size: 61 magic: 2 compresscodec: NONE crc: 1683579819 isvalid: true
baseOffset: 3530931575 lastOffset: 3530931575 count: 1 baseSequence: -1 
lastSequence: -1 producerId: 4001 producerEpoch: 2539 partitionLeaderEpoch: 94 
isTransactional: true isControl: true position: 61 CreateTime: 1556161832899 
size: 78 magic: 2 compresscodec: NONE crc: 535474521 isvalid: true
| offset: 3530931575 CreateTime: 1556161832899 keysize: 4 valuesize: 6 
sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 84
baseOffset: 3530931576 lastOffset: 3530931577 count: 0 baseSequence: 0 
lastSequence: 1 producerId: 3007 producerEpoch: 2516 partitionLeaderEpoch: 94 
isTransactional: true isControl: false position: 139 CreateTime: 1556161832997 
size: 61 magic: 2 compresscodec: NONE crc: 3760382141 isvalid: true
baseOffset: 3530931578 lastOffset: 3530931579 count: 0 baseSequence: 0 
lastSequence: 1 producerId: 1004 producerEpoch: 2576 partitionLeaderEpoch: 94 
isTransactional: true isControl: false position: 200 CreateTime: 1556161832998 
size: 61 magic: 2 compresscodec: NONE crc: 3285369041 isvalid: true
baseOffset: 3530931580 lastOffset: 3530931581 count: 0 baseSequence: 0 
lastSequence: 1 producerId: 1005 producerEpoch: 2545 partitionLeaderEpoch: 94 
isTransactional: true isControl: false position: 261 CreateTime: 1556161832998 
size: 61 magic: 2 compresscodec: NONE crc: 1698037918 isvalid: true
baseOffset: 3530931582 lastOffset: 3530931583 count: 0 baseSequence: 0 
lastSequence: 1 producerId: 3003 producerEpoch: 2529 partitionLeaderEpoch: 94 
isTransactional: true isControl: false position: 322 CreateTime: 1556161832998 
size: 61 magic: 2 compresscodec: NONE crc: 3446788505 isvalid: true
baseOffset: 3530931584 lastOffset: 3530931585 count: 0 baseSequence: 0 
lastSequence: 1 producerId: 3001 producerEpoch: 2486 partitionLeaderEpoch: 94 
isTransactional: true isControl: false position: 383 CreateTime: 1556161832998 
size: 61 magic: 2 compresscodec: NONE crc: 2245471394 isvalid: true
baseOffset: 3530931586 lastOffset: 3530931587 count: 0 baseSequence: 0 
lastSequence: 1 producerId: 3006 producerEpoch: 2503 partitionLeaderEpoch: 94 
isTransactional: true isControl: false position: 444 CreateTime: 1556161832999 
size: 61 magic: 2 compresscodec: NONE crc: 1819109301 isvalid: true
baseOffset: 3530931588 lastOffset: 3530931588 count: 1 baseSequence: -1 
lastSequence: -1 producerId: 3007 producerEpoch: 2516 partitionLeaderEpoch: 94 
isTransactional: true isControl: true position: 505 CreateTime: 1556161833001 
size: 78 magic: 2 compresscodec: NONE crc: 2403915653 isvalid: true
| offset: 3530931588 CreateTime: 1556161833001 keysize: 4 valuesize: 6 
sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 95
baseOffset: 3530931589 lastOffset: 3530931589 count: 1 baseSequence: -1 
lastSequence: -1 producerId: 3003 producerEpoch: 2529 partitionLeaderEpoch: 94 
isTransactional: true isControl: true position: 583 CreateTime: 1556161833004 
size: 78 magic: 2 compresscodec: NONE crc: 4184380477 isvalid: true
| offset: 3530931589 CreateTime: 1556161833004 keysize: 4 valuesize: 6 
sequence: -1 headerKeys: [] endTxnMarker: COMMIT coordinatorEpoch: 95

{code}

> Log cleaner skips Transactional mark and batch record, causing unlimited 
> growth of __consumer_offsets
> -
>
> Key: KAFKA-8335
> URL: https://issues.apache.org/jira/browse/KAFKA-8335
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.0
>Reporter: Boquan Tang
>Priority: Major
> Attachments: seg_april_25.zip, segment.zip
>
>
> My Colleague Weichu already sent out a mail to kafka user mailing list 
> regarding this issue, but we think it's worth having a ticket tracking it.
> We are using Kafka Streams with exactly-once enabled on a Kafka cluster for
> a while.
> Recently we found that the size of __consumer_offsets partitions 

[jira] [Assigned] (KAFKA-8311) Better consumer timeout exception handling

2019-05-08 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reassigned KAFKA-8311:
--

Assignee: (was: Boyang Chen)

> Better consumer timeout exception handling 
> ---
>
> Key: KAFKA-8311
> URL: https://issues.apache.org/jira/browse/KAFKA-8311
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Boyang Chen
>Priority: Major
>  Labels: newbie
>
> When stream application crashed due to underlying consumer commit timeout, we 
> have seen following gaps:
> 1. The current timeout exception doesn't provide meaningful tuning 
> instructions. We should augment the error message to let user change 
> `default.api.timeout.ms` in order to tolerate longer reaction time.
> 2. Currently we have 3 different types of consumers on KStream: 
> thread-consumer, global-consumer and restore-consumer. Although we don't plan 
> to explicitly handle this consumer timeout on stream level, we could wrap it 
> with more meaningful message either on consumer or stream level to let user 
> be aware which consumer is having trouble.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets

2019-05-08 Thread Boquan Tang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boquan Tang updated KAFKA-8335:
---
Attachment: seg_april_25.zip

> Log cleaner skips Transactional mark and batch record, causing unlimited 
> growth of __consumer_offsets
> -
>
> Key: KAFKA-8335
> URL: https://issues.apache.org/jira/browse/KAFKA-8335
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.0
>Reporter: Boquan Tang
>Priority: Major
> Attachments: seg_april_25.zip, segment.zip
>
>
> My Colleague Weichu already sent out a mail to kafka user mailing list 
> regarding this issue, but we think it's worth having a ticket tracking it.
> We are using Kafka Streams with exactly-once enabled on a Kafka cluster for
> a while.
> Recently we found that the size of __consumer_offsets partitions grew huge.
> Some partition went over 30G. This caused Kafka to take quite long to load
> "__consumer_offsets" topic on startup (it loads the topic in order to
> become group coordinator).
> We dumped the __consumer_offsets segments and found that while normal
> offset commits are nicely compacted, transaction records (COMMIT, etc) are
> all preserved. Looks like that since these messages don't have a key, the
> LogCleaner is keeping them all:
> --
> $ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
> /003484332061.log --key-decoder-class
> kafka.serializer.StringDecoder 2>/dev/null | cat -v | head
> Dumping 003484332061.log
> Starting offset: 3484332061
> offset: 3484332089 position: 549 CreateTime: 1556003706952 isvalid: true
> keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 1006
> producerEpoch: 2530 sequence: -1 isTransactional: true headerKeys: []
> endTxnMarker: COMMIT coordinatorEpoch: 81
> offset: 3484332090 position: 627 CreateTime: 1556003706952 isvalid: true
> keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 4005
> producerEpoch: 2520 sequence: -1 isTransactional: true headerKeys: []
> endTxnMarker: COMMIT coordinatorEpoch: 84
> ...
> --
> Streams is doing transaction commits per 100ms (commit.interval.ms=100 when
> exactly-once) so the __consumer_offsets is growing really fast.
> Is this (to keep all transactions) by design, or is that a bug for
> LogCleaner?  What would be the way to clean up the topic?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets

2019-05-08 Thread Boquan Tang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boquan Tang updated KAFKA-8335:
---
Attachment: (was: seg_april_25.zip)

> Log cleaner skips Transactional mark and batch record, causing unlimited 
> growth of __consumer_offsets
> -
>
> Key: KAFKA-8335
> URL: https://issues.apache.org/jira/browse/KAFKA-8335
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.0
>Reporter: Boquan Tang
>Priority: Major
> Attachments: seg_april_25.zip, segment.zip
>
>
> My Colleague Weichu already sent out a mail to kafka user mailing list 
> regarding this issue, but we think it's worth having a ticket tracking it.
> We are using Kafka Streams with exactly-once enabled on a Kafka cluster for
> a while.
> Recently we found that the size of __consumer_offsets partitions grew huge.
> Some partition went over 30G. This caused Kafka to take quite long to load
> "__consumer_offsets" topic on startup (it loads the topic in order to
> become group coordinator).
> We dumped the __consumer_offsets segments and found that while normal
> offset commits are nicely compacted, transaction records (COMMIT, etc) are
> all preserved. Looks like that since these messages don't have a key, the
> LogCleaner is keeping them all:
> --
> $ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
> /003484332061.log --key-decoder-class
> kafka.serializer.StringDecoder 2>/dev/null | cat -v | head
> Dumping 003484332061.log
> Starting offset: 3484332061
> offset: 3484332089 position: 549 CreateTime: 1556003706952 isvalid: true
> keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 1006
> producerEpoch: 2530 sequence: -1 isTransactional: true headerKeys: []
> endTxnMarker: COMMIT coordinatorEpoch: 81
> offset: 3484332090 position: 627 CreateTime: 1556003706952 isvalid: true
> keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 4005
> producerEpoch: 2520 sequence: -1 isTransactional: true headerKeys: []
> endTxnMarker: COMMIT coordinatorEpoch: 84
> ...
> --
> Streams is doing transaction commits per 100ms (commit.interval.ms=100 when
> exactly-once) so the __consumer_offsets is growing really fast.
> Is this (to keep all transactions) by design, or is that a bug for
> LogCleaner?  What would be the way to clean up the topic?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8284) Enable static membership on KStream

2019-05-08 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-8284.

Resolution: Fixed

> Enable static membership on KStream
> ---
>
> Key: KAFKA-8284
> URL: https://issues.apache.org/jira/browse/KAFKA-8284
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8311) Better consumer timeout exception handling

2019-05-08 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836068#comment-16836068
 ] 

Boyang Chen commented on KAFKA-8311:


[~clearpal7] Thanks for interest! You need to send an email to 
[d...@kafka.apache.org|mailto:d...@kafka.apache.org] to get access to wiki and 
jira, and I will assign this ticket to you.

> Better consumer timeout exception handling 
> ---
>
> Key: KAFKA-8311
> URL: https://issues.apache.org/jira/browse/KAFKA-8311
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Boyang Chen
>Priority: Major
>  Labels: newbie
>
> When stream application crashed due to underlying consumer commit timeout, we 
> have seen following gaps:
> 1. The current timeout exception doesn't provide meaningful tuning 
> instructions. We should augment the error message to let user change 
> `default.api.timeout.ms` in order to tolerate longer reaction time.
> 2. Currently we have 3 different types of consumers on KStream: 
> thread-consumer, global-consumer and restore-consumer. Although we don't plan 
> to explicitly handle this consumer timeout on stream level, we could wrap it 
> with more meaningful message either on consumer or stream level to let user 
> be aware which consumer is having trouble.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets

2019-05-08 Thread Boquan Tang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boquan Tang updated KAFKA-8335:
---
Attachment: seg_april_25.zip

> Log cleaner skips Transactional mark and batch record, causing unlimited 
> growth of __consumer_offsets
> -
>
> Key: KAFKA-8335
> URL: https://issues.apache.org/jira/browse/KAFKA-8335
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.0
>Reporter: Boquan Tang
>Priority: Major
> Attachments: seg_april_25.zip, segment.zip
>
>
> My Colleague Weichu already sent out a mail to kafka user mailing list 
> regarding this issue, but we think it's worth having a ticket tracking it.
> We are using Kafka Streams with exactly-once enabled on a Kafka cluster for
> a while.
> Recently we found that the size of __consumer_offsets partitions grew huge.
> Some partition went over 30G. This caused Kafka to take quite long to load
> "__consumer_offsets" topic on startup (it loads the topic in order to
> become group coordinator).
> We dumped the __consumer_offsets segments and found that while normal
> offset commits are nicely compacted, transaction records (COMMIT, etc) are
> all preserved. Looks like that since these messages don't have a key, the
> LogCleaner is keeping them all:
> --
> $ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
> /003484332061.log --key-decoder-class
> kafka.serializer.StringDecoder 2>/dev/null | cat -v | head
> Dumping 003484332061.log
> Starting offset: 3484332061
> offset: 3484332089 position: 549 CreateTime: 1556003706952 isvalid: true
> keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 1006
> producerEpoch: 2530 sequence: -1 isTransactional: true headerKeys: []
> endTxnMarker: COMMIT coordinatorEpoch: 81
> offset: 3484332090 position: 627 CreateTime: 1556003706952 isvalid: true
> keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 4005
> producerEpoch: 2520 sequence: -1 isTransactional: true headerKeys: []
> endTxnMarker: COMMIT coordinatorEpoch: 84
> ...
> --
> Streams is doing transaction commits per 100ms (commit.interval.ms=100 when
> exactly-once) so the __consumer_offsets is growing really fast.
> Is this (to keep all transactions) by design, or is that a bug for
> LogCleaner?  What would be the way to clean up the topic?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8339) At-least-once delivery guarantee seemingly not met due to async commit / produce failure race condition

2019-05-08 Thread tdp (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836065#comment-16836065
 ] 

tdp commented on KAFKA-8339:


Hi Matthias,

Thanks for the quick reply! After digging into this a bit more, I think the 
issue is in our aggregation logic. We aggregate records and put the 
sub-aggregation result into the local state store. When the aggregation is 
"full" and the non-null value is returned by KSTREAM-TRANSFORMVALUES-NODE1, the 
transform node logic also deletes the sub-aggregation from the local state 
store. This is the flaw in our logic, since only the last record from topic T1 
is consumed by the new stream thread and it does not see the sub-aggregation in 
the local state store since it was previously deleted. I was incorrectly 
assuming that the earlier records should have been re-consumed. The logs below 
explain in more detail.

I also should have specified that we are using a customized exception handler 
that looks like this:
{code:java}
public class CustomLogAndFailExceptionHandler implements 
ProductionExceptionHandler, DeserializationExceptionHandler {
    // ...
    
    @Override public ProductionExceptionHandlerResponse 
handle(ProducerRecord record, Exception exception) {
        // ...
        return ProductionExceptionHandlerResponse.FAIL;
    }

    @Override public DeserializationHandlerResponse handle(ProcessorContext 
context, ConsumerRecord record, Exception exception) {
        // ...
        return DeserializationHandlerResponse.FAIL;
    }
}
{code}
For clarity, here's some selected logs (with additional custom debug/info logs 
added):

Record 1 (of 3) aggregated and sub-aggregation written to local state store but 
filtered out by KSTREAM-FILTER-NODE1:
{noformat}
07 May 2019 14:03:10,951 [INFO] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.StreamThread: stream-thread 
[StreamThread-18] Polled record for topic TOPIC-T1 and partition 10 with key 
!0d81fc45-f485-4676-901f-6c1ced7042b0 and offset 5 for request ID 
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:10,951 [INFO] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.StreamTask: task [1_10] Adding 
record to task 1_10 for topic TOPIC-T1 and partition 10 with key 
!0d81fc45-f485-4676-901f-6c1ced7042b0 and offset 5 for request ID 
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:10,951 [INFO] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.SourceNode: KSTREAM-SOURCE-NODE1 
consuming key !0d81fc45-f485-4676-901f-6c1ced7042b0 for request ID 
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:10,953 [INFO] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.ProcessorNode: 
KSTREAM-TRANSFORMVALUES-NODE1 processing key 
!0d81fc45-f485-4676-901f-6c1ced7042b0 for request ID 
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:10,954 [INFO] 554a65bc-e0bd-486d-abfa-ed3a3ac75af1 10 
(StreamThread-15) Aggregator: Processed '1' of '3' messages for 
'c72d609c-2d8d-420f-99d0-b11593e32c981466642757'.
07 May 2019 14:03:10,955 [INFO] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.ProcessorNode: 
KSTREAM-FILTER-NODE1 processing key !0d81fc45-f485-4676-901f-6c1ced7042b0 for 
request ID 554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
{noformat}
Record 2 (of 3) aggregated and sub-aggregation written to local state store but 
filtered out by KSTREAM-FILTER-NODE1:
{noformat}
07 May 2019 14:03:10,969 [INFO] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.StreamThread: stream-thread 
[StreamThread-18] Polled record for topic TOPIC-T1 and partition 10 with key 
!b5bc5c31-b676-483f-a0d3-4eeab7b0431c and offset 6 for request ID 
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:10,969 [INFO] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.StreamTask: task [1_10] Adding 
record to task 1_10 for topic TOPIC-T1 and partition 10 with key 
!b5bc5c31-b676-483f-a0d3-4eeab7b0431c and offset 6 for request ID 
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:10,971 [INFO] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.SourceNode: KSTREAM-SOURCE-NODE1 
consuming key !b5bc5c31-b676-483f-a0d3-4eeab7b0431c for request ID 
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:10,974 [INFO] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.ProcessorNode: 
KSTREAM-TRANSFORMVALUES-NODE1 processing key 
!b5bc5c31-b676-483f-a0d3-4eeab7b0431c for request ID 
554a65bc-e0bd-486d-abfa-ed3a3ac75af1.
07 May 2019 14:03:10,978 [INFO] 554a65bc-e0bd-486d-abfa-ed3a3ac75af1 10 
(StreamThread-15) Aggregator: Processed '2' of '3' messages for 
'c72d609c-2d8d-420f-99d0-b11593e32c981466642757'.
07 May 2019 14:03:10,981 [INFO] (StreamThread-18) 
org.apache.kafka.streams.processor.internals.ProcessorNode: 
KSTREAM-FILTER-NODE1 processing key !b5bc5c31-b676-483f-a0d3-4eeab7b0431c for 
request ID 

[jira] [Commented] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)

2019-05-08 Thread Ryanne Dolan (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836058#comment-16836058
 ] 

Ryanne Dolan commented on KAFKA-7500:
-

[~cloudfrog], glad you are taking a look. I'm looking forward to hearing about 
your experience.

> Is this expected to work with kafka 1.1.1 clusters?

Yes, I believe it will work with 0.11.0.0 or higher, but maybe you can test it 
to verify :)

> will prefix remote topic names ... be configurable? 

Yes, you can implement your own ReplicationPolicy to define remote topics 
however you like:

 
{code:java}
replication.policy.class = my.SuffixReplicationPolicy
{code}
Also, MM2 doesn't care how existing source topics are named. If your topics are 
prefixed with their local DC (a common pattern), you can leave them as-is 
without breaking anything. By default you'd get topics like "dc1.topic1-dc1", 
so you might consider implementing a ReplicationPolicy that strips the suffix 
during replication so you get just "dc1.topic1".

Ryanne

 

> MirrorMaker 2.0 (KIP-382)
> -
>
> Key: KAFKA-7500
> URL: https://issues.apache.org/jira/browse/KAFKA-7500
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect, mirrormaker
>Reporter: Ryanne Dolan
>Assignee: Ryanne Dolan
>Priority: Minor
>
> Implement a drop-in replacement for MirrorMaker leveraging the Connect 
> framework.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets

2019-05-08 Thread Weichu Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836061#comment-16836061
 ] 

Weichu Liu commented on KAFKA-8335:
---

Hi, I uploaded a sample segment here: [^segment.zip]

And here is the broker setting on our Kafka

{noformat}
[2019-05-07 02:00:50,472] INFO KafkaConfig values:
advertised.host.name = null
advertised.listeners = null
advertised.port = null
alter.config.policy.class.name = null
alter.log.dirs.replication.quota.window.num = 11
alter.log.dirs.replication.quota.window.size.seconds = 1
authorizer.class.name =
auto.create.topics.enable = true
auto.leader.rebalance.enable = true
background.threads = 10
broker.id = 1
broker.id.generation.enable = true
broker.rack = null
client.quota.callback.class = null
compression.type = producer
connection.failed.authentication.delay.ms = 100
connections.max.idle.ms = 60
connections.max.reauth.ms = 0
control.plane.listener.name = null
controlled.shutdown.enable = true
controlled.shutdown.max.retries = 3
controlled.shutdown.retry.backoff.ms = 5000
controller.socket.timeout.ms = 3
create.topic.policy.class.name = null
default.replication.factor = 3
delegation.token.expiry.check.interval.ms = 360
delegation.token.expiry.time.ms = 8640
delegation.token.master.key = null
delegation.token.max.lifetime.ms = 60480
delete.records.purgatory.purge.interval.requests = 1
delete.topic.enable = true
fetch.purgatory.purge.interval.requests = 1000
group.initial.rebalance.delay.ms = 3000
group.max.session.timeout.ms = 30
group.max.size = 2147483647
group.min.session.timeout.ms = 6000
host.name =
inter.broker.listener.name = null
inter.broker.protocol.version = 2.2-IV1
kafka.metrics.polling.interval.secs = 10
kafka.metrics.reporters = []
leader.imbalance.check.interval.seconds = 300
leader.imbalance.per.broker.percentage = 10
listener.security.protocol.map = 
PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listeners = PLAINTEXT://:9092,SSL://:9093
log.cleaner.backoff.ms = 15000
log.cleaner.dedupe.buffer.size = 134217728
log.cleaner.delete.retention.ms = 8640
log.cleaner.enable = true
log.cleaner.io.buffer.load.factor = 0.9
log.cleaner.io.buffer.size = 524288
log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
log.cleaner.min.cleanable.ratio = 0.5
log.cleaner.min.compaction.lag.ms = 0
log.cleaner.threads = 1
log.cleanup.policy = [delete]
log.dir = /tmp/kafka-logs
log.dirs = /var/lib/kafka/data
log.flush.interval.messages = 9223372036854775807
log.flush.interval.ms = null
log.flush.offset.checkpoint.interval.ms = 6
log.flush.scheduler.interval.ms = 9223372036854775807
log.flush.start.offset.checkpoint.interval.ms = 6
log.index.interval.bytes = 4096
log.index.size.max.bytes = 10485760
log.message.downconversion.enable = true
log.message.format.version = 2.2-IV1
log.message.timestamp.difference.max.ms = 9223372036854775807
log.message.timestamp.type = CreateTime
log.preallocate = false
log.retention.bytes = -1
log.retention.check.interval.ms = 30
log.retention.hours = 168
log.retention.minutes = null
log.retention.ms = null
log.roll.hours = 168
log.roll.jitter.hours = 0
log.roll.jitter.ms = null
log.roll.ms = null
log.segment.bytes = 1073741824
log.segment.delete.delay.ms = 6
max.connections.per.ip = 2147483647
max.connections.per.ip.overrides =
max.incremental.fetch.session.cache.slots = 1000
message.max.bytes = 2097152
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 3
min.insync.replicas = 1
num.io.threads = 8
num.network.threads = 3
num.partitions = 5
num.recovery.threads.per.data.dir = 1
num.replica.alter.log.dirs.threads = null
num.replica.fetchers = 1
offset.metadata.max.bytes = 4096
offsets.commit.required.acks = -1
offsets.commit.timeout.ms = 5000
offsets.load.buffer.size = 5242880
offsets.retention.check.interval.ms = 60
offsets.retention.minutes = 10080
offsets.topic.compression.codec = 0
offsets.topic.num.partitions = 50
offsets.topic.replication.factor = 3
offsets.topic.segment.bytes = 

[jira] [Updated] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets

2019-05-08 Thread Weichu Liu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weichu Liu updated KAFKA-8335:
--
Attachment: segment.zip

> Log cleaner skips Transactional mark and batch record, causing unlimited 
> growth of __consumer_offsets
> -
>
> Key: KAFKA-8335
> URL: https://issues.apache.org/jira/browse/KAFKA-8335
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.0
>Reporter: Boquan Tang
>Priority: Major
> Attachments: segment.zip
>
>
> My Colleague Weichu already sent out a mail to kafka user mailing list 
> regarding this issue, but we think it's worth having a ticket tracking it.
> We are using Kafka Streams with exactly-once enabled on a Kafka cluster for
> a while.
> Recently we found that the size of __consumer_offsets partitions grew huge.
> Some partition went over 30G. This caused Kafka to take quite long to load
> "__consumer_offsets" topic on startup (it loads the topic in order to
> become group coordinator).
> We dumped the __consumer_offsets segments and found that while normal
> offset commits are nicely compacted, transaction records (COMMIT, etc) are
> all preserved. Looks like that since these messages don't have a key, the
> LogCleaner is keeping them all:
> --
> $ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
> /003484332061.log --key-decoder-class
> kafka.serializer.StringDecoder 2>/dev/null | cat -v | head
> Dumping 003484332061.log
> Starting offset: 3484332061
> offset: 3484332089 position: 549 CreateTime: 1556003706952 isvalid: true
> keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 1006
> producerEpoch: 2530 sequence: -1 isTransactional: true headerKeys: []
> endTxnMarker: COMMIT coordinatorEpoch: 81
> offset: 3484332090 position: 627 CreateTime: 1556003706952 isvalid: true
> keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 4005
> producerEpoch: 2520 sequence: -1 isTransactional: true headerKeys: []
> endTxnMarker: COMMIT coordinatorEpoch: 84
> ...
> --
> Streams is doing transaction commits per 100ms (commit.interval.ms=100 when
> exactly-once) so the __consumer_offsets is growing really fast.
> Is this (to keep all transactions) by design, or is that a bug for
> LogCleaner?  What would be the way to clean up the topic?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)

2019-05-08 Thread Dan Casey (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836033#comment-16836033
 ] 

Dan Casey commented on KAFKA-7500:
--

I'm excited to test this as well.  We've been fighting with duplicates for a 
long time, and wasting a lot of resources dealing with duplicate detection and 
remediation.

I have a few questions about the initial release.
 # Is this expected to work with kafka 1.1.1 clusters?  This is currently the 
max version we can upgrade to without breaking support for vertica integration.
 # KIP-382 mentions that you will prefix remote topic names to avoid 
replication loops.  Would this be configurable?  I am already using a similar 
approach by adding the local datacenter as a suffix to my topics although it 
requires the producer to be datacenter aware, and additionally prevents the 
ability to create aggregate topics without help of app.  Would be great to see 
options to transform destination topic names.

> MirrorMaker 2.0 (KIP-382)
> -
>
> Key: KAFKA-7500
> URL: https://issues.apache.org/jira/browse/KAFKA-7500
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect, mirrormaker
>Reporter: Ryanne Dolan
>Assignee: Ryanne Dolan
>Priority: Minor
>
> Implement a drop-in replacement for MirrorMaker leveraging the Connect 
> framework.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8342) Admin tool to setup Kafka Stream topology (internal) topics

2019-05-08 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-8342:
---
Labels: newbie  (was: )

> Admin tool to setup Kafka Stream topology (internal) topics
> ---
>
> Key: KAFKA-8342
> URL: https://issues.apache.org/jira/browse/KAFKA-8342
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Boyang Chen
>Priority: Major
>  Labels: newbie
>
> We have seen customers who need to deploy their application to production 
> environment but don't have access to create changelog and repartition topics. 
> They need to ask admin team to manually create those topics before proceeding 
> to start the actual stream job. We could add an admin tool to help them go 
> through the process quicker by providing a command that could
>  # Read through current stream topology
>  # Create corresponding topics as needed, even including output topics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8342) Admin tool to setup Kafka Stream topology (internal) topics

2019-05-08 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8342?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-8342:
---
Issue Type: New Feature  (was: Bug)

> Admin tool to setup Kafka Stream topology (internal) topics
> ---
>
> Key: KAFKA-8342
> URL: https://issues.apache.org/jira/browse/KAFKA-8342
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Boyang Chen
>Priority: Major
>
> We have seen customers who need to deploy their application to production 
> environment but don't have access to create changelog and repartition topics. 
> They need to ask admin team to manually create those topics before proceeding 
> to start the actual stream job. We could add an admin tool to help them go 
> through the process quicker by providing a command that could
>  # Read through current stream topology
>  # Create corresponding topics as needed, even including output topics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8311) Better consumer timeout exception handling

2019-05-08 Thread WooYoung (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16836028#comment-16836028
 ] 

WooYoung commented on KAFKA-8311:
-

Hello [~bchen225242]. I`m a new committer Kafka Project.

I want to analyze this issue and to contribute to this project.

Could you give me an assigner on this issue?

> Better consumer timeout exception handling 
> ---
>
> Key: KAFKA-8311
> URL: https://issues.apache.org/jira/browse/KAFKA-8311
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>  Labels: newbie
>
> When stream application crashed due to underlying consumer commit timeout, we 
> have seen following gaps:
> 1. The current timeout exception doesn't provide meaningful tuning 
> instructions. We should augment the error message to let user change 
> `default.api.timeout.ms` in order to tolerate longer reaction time.
> 2. Currently we have 3 different types of consumers on KStream: 
> thread-consumer, global-consumer and restore-consumer. Although we don't plan 
> to explicitly handle this consumer timeout on stream level, we could wrap it 
> with more meaningful message either on consumer or stream level to let user 
> be aware which consumer is having trouble.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8342) Admin tool to setup Kafka Stream topology (internal) topics

2019-05-08 Thread Boyang Chen (JIRA)
Boyang Chen created KAFKA-8342:
--

 Summary: Admin tool to setup Kafka Stream topology (internal) 
topics
 Key: KAFKA-8342
 URL: https://issues.apache.org/jira/browse/KAFKA-8342
 Project: Kafka
  Issue Type: Bug
Reporter: Boyang Chen


We have seen customers who need to deploy their application to production 
environment but don't have access to create changelog and repartition topics. 
They need to ask admin team to manually create those topics before proceeding 
to start the actual stream job. We could add an admin tool to help them go 
through the process quicker by providing a command that could
 # Read through current stream topology
 # Create corresponding topics as needed, even including output topics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8286) KIP-460 Admin Leader Election RPC

2019-05-08 Thread Jose Armando Garcia Sancio (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-8286:
--
Description: 
Tracking issue for implementing KIP-460. Tasks:

# [Done] Design KIP
# [Done] Review KIP
# [Done] Approve KIP
# [Done] Update RPC to support KIP
# [Done] Update controller to support KIP
# [Done] Create CLI command (kafka-leader-election) that implement KIP
# [Done] Search and replace any usage of “preferred” in the code
# Add test for command
# Add test for controller functionality
# Revisit all of the documentation - generate and audit the new javadocs
# Deprecated since... needs to be update
# Review PR
# Merge PR

  was:
Tracking issue for implementing KIP-460. Tasks:

# [Done] Design KIP
# [Done] Review KIP
# [Done] Approve KIP
# [Done] Update RPC to support KIP
# [Done] Update controller to support KIP
# [Done] Create CLI command (kafka-leader-election) that implement KIP
# Search and replace any usage of “preferred” in the code
# Add test for command
# Add test for controller functionality
# Revisit all of the documentation - generate and audit the new javadocs
# Review PR
# Merge PR


> KIP-460 Admin Leader Election RPC
> -
>
> Key: KAFKA-8286
> URL: https://issues.apache.org/jira/browse/KAFKA-8286
> Project: Kafka
>  Issue Type: New Feature
>  Components: admin, clients, core
>Reporter: Jose Armando Garcia Sancio
>Assignee: Jose Armando Garcia Sancio
>Priority: Major
>
> Tracking issue for implementing KIP-460. Tasks:
> # [Done] Design KIP
> # [Done] Review KIP
> # [Done] Approve KIP
> # [Done] Update RPC to support KIP
> # [Done] Update controller to support KIP
> # [Done] Create CLI command (kafka-leader-election) that implement KIP
> # [Done] Search and replace any usage of “preferred” in the code
> # Add test for command
> # Add test for controller functionality
> # Revisit all of the documentation - generate and audit the new javadocs
> # Deprecated since... needs to be update
> # Review PR
> # Merge PR



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7830) Convert Kafka RPCs to use automatically generated code

2019-05-08 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-7830.

Resolution: Fixed

> Convert Kafka RPCs to use automatically generated code
> --
>
> Key: KAFKA-7830
> URL: https://issues.apache.org/jira/browse/KAFKA-7830
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>Priority: Major
>
> KAFKA-7609 added a way of automatically generating code for reading and 
> writing Kafka RPC message types from JSON schemas.
> Automatically generated code is preferrable to manually written serialization 
> code. 
> * * It is less tedious and error-prone to use than hand-written code.
> * For developers writing Kafka clients in other languages, the JSON schemas 
> are useful in a way that the java serialization code is not.
> * It will eventually be possible to automatically validate aspects of 
> cross-version compatibility, when using JSON message schemas.
> * Once all of the RPCs are converted, we can drop using Structs in favor of 
> serializing directly to ByteBuffer, to reduce GC load.
> This JIRA tracks converting the current hand-written message serialization 
> code to automatically generated serialization.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (KAFKA-7830) Convert Kafka RPCs to use automatically generated code

2019-05-08 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reopened KAFKA-7830:


> Convert Kafka RPCs to use automatically generated code
> --
>
> Key: KAFKA-7830
> URL: https://issues.apache.org/jira/browse/KAFKA-7830
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>Priority: Major
>
> KAFKA-7609 added a way of automatically generating code for reading and 
> writing Kafka RPC message types from JSON schemas.
> Automatically generated code is preferrable to manually written serialization 
> code. 
> * * It is less tedious and error-prone to use than hand-written code.
> * For developers writing Kafka clients in other languages, the JSON schemas 
> are useful in a way that the java serialization code is not.
> * It will eventually be possible to automatically validate aspects of 
> cross-version compatibility, when using JSON message schemas.
> * Once all of the RPCs are converted, we can drop using Structs in favor of 
> serializing directly to ByteBuffer, to reduce GC load.
> This JIRA tracks converting the current hand-written message serialization 
> code to automatically generated serialization.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8196) Replace InitProducerId request/response with automated protocol

2019-05-08 Thread Boyang Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-8196.

Resolution: Fixed

> Replace InitProducerId request/response with automated protocol
> ---
>
> Key: KAFKA-8196
> URL: https://issues.apache.org/jira/browse/KAFKA-8196
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Jason Gustafson
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8339) At-least-once delivery guarantee seemingly not met due to async commit / produce failure race condition

2019-05-08 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835931#comment-16835931
 ] 

Matthias J. Sax commented on KAFKA-8339:


{quote}The race condition occurs when the stream thread commits its offsets for 
topic T1 after it consumes some or all of the necessary records from topic T1 
for an aggregation but before it gets the failure response back from the async 
produce kicked off by KSTREAM-SINK-NODE1.
{quote}
This does not sound correct. Before offsets are committed, `producer.flush()` 
is called and all pending in-flight request should be written. Committing 
offset should only happen if no error occurred during `flush()`. Can you 
confirm your observation? If your description is correct, we need to figure out 
why we still commit even if an error occurred.

Also, offsets are not committed async but via `consumer.commitSync()`.
{quote}LogAndFailExceptionHandler
{quote}
This handler is for the input path, and should only be called if there is a 
deserialization exception. Thus, I don't see how it is related to the other 
things reported here? Can you elaborate?
{quote}so when the stream thread tries to commit the next time it fails and the 
stream thread shuts itself down.
{quote}
I don't see the causality? Why would committing fail? Also, what error message 
do you see on a failed commit?

> At-least-once delivery guarantee seemingly not met due to async commit / 
> produce failure race condition
> ---
>
> Key: KAFKA-8339
> URL: https://issues.apache.org/jira/browse/KAFKA-8339
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: tdp
>Priority: Major
>
> We have hit a race condition several times now between the StreamThread 
> committing its offsets for a task before the task has fully processed the 
> record through the topology.
>  
> Consider part of a topology that looks like this:
>  
> TOPIC T1 -> KSTREAM-SOURCE-NODE1 > KSTREAM-TRANSFORMVALUES-NODE1 > 
> KSTREAM-FILTER-NODE1 > KSTREAM-MAPVALUES-NODE1 -> KSTREAM-SINK-NODE1 -> TOPIC 
> T2
>  
> Records are committed to topic T1. KSTREAM-SOURCE-NODE1 consumes these 
> records from topic T1. KSTREAM-TRANSFORMVALUES-NODE1 aggregates these records 
> using a local state store. KSTREAM-TRANSFORMVALUES-NODE1 returns null if not 
> all necessary records from topic T1 have been consumed yet or an object 
> representing an aggregation of records if all necessary records from topic T1 
> have been consumed. KSTREAM-FILTER-NODE1 then filters out anything that is 
> null. Only an aggregation of records is passed to the KSTREAM-MAPVALUES-NODE1 
> node. KSTREAM-MAPVALUES-NODE1 then maps the aggregation of records into 
> another object type. KSTREAM-SINK-NODE1 then attempts to produce this other 
> object to topic T2.
>  
> The race condition occurs when the stream thread commits its offsets for 
> topic T1 after it consumes some or all of the necessary records from topic T1 
> for an aggregation but before it gets the failure response back from the 
> async produce kicked off by KSTREAM-SINK-NODE1.
>  
> We are running with a LogAndFailExceptionHandler, so when the stream thread 
> tries to commit the next time it fails and the stream thread shuts itself 
> down. The stream task is then reassigned to another stream thread, which 
> reads the offsets previously committed by the original stream thread. That 
> means the new stream thread's KSTREAM-SOURCE-NODE1 will never be able to 
> consume the messages required for the aggregation and the KSTREAM-SINK-NODE1 
> will never end up producing the required records to topic T2. This is why it 
> seems the at-least-once delivery guarantee is not met - KSTREAM-SINK-NODE1 
> never successfully processed records and the stream application continued on 
> past it.
> Note: we are running with StreamsConfig.RETRIES_CONFIG set to 10, which 
> increases the likelihood of occurrence of the issue when all retries fail 
> since it widens the window at which the async offset commit can occur before 
> the produce record request is marked as failed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8341) AdminClient should retry coordinator lookup after NOT_COORDINATOR error

2019-05-08 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-8341:
--

 Summary: AdminClient should retry coordinator lookup after 
NOT_COORDINATOR error
 Key: KAFKA-8341
 URL: https://issues.apache.org/jira/browse/KAFKA-8341
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Vikas Singh


If a group operation (e.g. DescribeGroup) fails because the coordinator has 
moved, the AdminClient should lookup the coordinator before retrying the 
operation. Currently we will either fail or just retry anyway. This is similar 
in some ways to controller rediscovery after getting NOT_CONTROLLER errors.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8340) ServiceLoader fails when used from isolated plugin path directory

2019-05-08 Thread Chris Egerton (JIRA)
Chris Egerton created KAFKA-8340:


 Summary: ServiceLoader fails when used from isolated plugin path 
directory
 Key: KAFKA-8340
 URL: https://issues.apache.org/jira/browse/KAFKA-8340
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Chris Egerton


Under some circumstances, the {{ServiceLoader.load}} mechanism will fail when 
used from an isolated plugin path directory and return an incomplete (often 
empty) {{ServiceLoader}} instance.

 

To replicate:
 * Include a {{META-INF/services/...}} file in one of the JARS located in a 
plugin's directory with one or more implementations of that service listed 
inside. For the sake of example, let's say the name of this service is 
{{com.example.MyService}}
 * Program that plugin to invoke 
{{ServiceLoader.load(com.example.MyService.class)}}
 * Start the Connect framework, making sure this plugin is included on the 
plugin path and that it somehow invokes the {{ServiceLoader.load(...)}} method
 * Observe that the services loaded by that invocation do not include the ones 
described in the {{META-INF/services/...}} file contained in the JAR in the 
plugin's directory

 

This is because the 
[ServiceLoader.load(Class)|https://docs.oracle.com/javase/7/docs/api/java/util/ServiceLoader.html#load(java.lang.Class)]
 method uses the current thread's context classloader to locate resources and 
load services. The current thread's context classloader is, in most cases, an 
instance of {{DelegatingClassLoader}}, which will (unless asked to locate 
resources corresponding to a provider-configuration file for a REST extension 
or config provider) simply delegate resource location to the parent and, unless 
asked to locate a class for a recognized plugin, also delegate class loading to 
the parent. Thus, none of the plugin's JARs are scanned for either 
provider-configuration files or for actual service classes.

A viable workaround for some cases is to instead use the 
[ServiceLoader.load(Class, 
ClassLoader)|https://docs.oracle.com/javase/7/docs/api/java/util/ServiceLoader.html#load(java.lang.Class,%20java.lang.ClassLoader)]
 method, specifying the current class's classloader as the second argument. 
This causes the plugin's {{PluginClassLoader}}, which will scan all JARs in the 
plugin's directory to be used to locate resources and classes.

However, this may not be feasible in all cases, especially when working with 
external libraries that may be difficult or impossible to apply this workaround 
on.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8339) At-least-once delivery guarantee seemingly not met due to async commit / produce failure race condition

2019-05-08 Thread tdp (JIRA)
tdp created KAFKA-8339:
--

 Summary: At-least-once delivery guarantee seemingly not met due to 
async commit / produce failure race condition
 Key: KAFKA-8339
 URL: https://issues.apache.org/jira/browse/KAFKA-8339
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.0.1
Reporter: tdp


We have hit a race condition several times now between the StreamThread 
committing its offsets for a task before the task has fully processed the 
record through the topology.
 
Consider part of a topology that looks like this:
 
TOPIC T1 -> KSTREAM-SOURCE-NODE1 > KSTREAM-TRANSFORMVALUES-NODE1 > 
KSTREAM-FILTER-NODE1 > KSTREAM-MAPVALUES-NODE1 -> KSTREAM-SINK-NODE1 -> TOPIC T2
 
Records are committed to topic T1. KSTREAM-SOURCE-NODE1 consumes these records 
from topic T1. KSTREAM-TRANSFORMVALUES-NODE1 aggregates these records using a 
local state store. KSTREAM-TRANSFORMVALUES-NODE1 returns null if not all 
necessary records from topic T1 have been consumed yet or an object 
representing an aggregation of records if all necessary records from topic T1 
have been consumed. KSTREAM-FILTER-NODE1 then filters out anything that is 
null. Only an aggregation of records is passed to the KSTREAM-MAPVALUES-NODE1 
node. KSTREAM-MAPVALUES-NODE1 then maps the aggregation of records into another 
object type. KSTREAM-SINK-NODE1 then attempts to produce this other object to 
topic T2.
 
The race condition occurs when the stream thread commits its offsets for topic 
T1 after it consumes some or all of the necessary records from topic T1 for an 
aggregation but before it gets the failure response back from the async produce 
kicked off by KSTREAM-SINK-NODE1.
 
We are running with a LogAndFailExceptionHandler, so when the stream thread 
tries to commit the next time it fails and the stream thread shuts itself down. 
The stream task is then reassigned to another stream thread, which reads the 
offsets previously committed by the original stream thread. That means the new 
stream thread's KSTREAM-SOURCE-NODE1 will never be able to consume the messages 
required for the aggregation and the KSTREAM-SINK-NODE1 will never end up 
producing the required records to topic T2. This is why it seems the 
at-least-once delivery guarantee is not met - KSTREAM-SINK-NODE1 never 
successfully processed records and the stream application continued on past it.

Note: we are running with StreamsConfig.RETRIES_CONFIG set to 10, which 
increases the likelihood of occurrence of the issue when all retries fail since 
it widens the window at which the async offset commit can occur before the 
produce record request is marked as failed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-08 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835832#comment-16835832
 ] 

John Roesler commented on KAFKA-8315:
-

Hi [~the4thamigo_uk], Sorry for leaving you hanging a bit. I'm glad your 
investigation is progressing.

Last question first: Sterams should choose to consume from the left or right 
based on which one has the lower timestamp in the next record, so I would not 
expect one side to "run ahead" of the other. There's one caveat, that when one 
side is being produced more slowly, Streams won't just wait indefinitely for 
the next data, but instead just process the side that does have data. This is 
controled by the "max idle ms" config, but since you're processing 
historically, this shouldn't be your problem. Still might be worth a look.

Maybe for debugging purposes, you can print out the key, value, and timestamp 
for each of the sides as well as in the joiner, so you can identify which side 
is triggering the join, and evaluate whether or not it's correctly time-ordered.

If it is in fact running ahead on one side, despite what it should be doing, 
this would explain why you see better results with a larger grace period. To 
confirm, the grace period should only matter up to the maximum time skew in 
your stream. So, as you said, if you have two producers that each produce a 
full 24 hours of data, sequentially, then you should see stream time advance 
when the first producer writes its data, and then "freeze" while the second 
producer writes its (out-of-order) data. Thus, you'll want to set the grace 
period to keep old windows around for at least 24 hours, since you know you 
have to wait for that second producer's data.

Finally, to answer your earlier questions, yes, each task is handling just one 
partition of both input topics (the same partition on the left and right). 
Stream Time is independently maintained for each task/partition, and it is 
computed simply as the highest timestamp yet observed for that partition. If 
you want to look at it in detail, it's tracked in 
org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore . 
Actually, you can set that class's logger to DEBUG mode and it'll print out 
every time it skips a record that is outside of retention.

Minor point, you should not need to mess with the retention of the changelog 
topic. Streams sets this appropriately to preserve the same data as the store, 
but this is only apparent when restoring the store. The actual results of the 
join are served out of the state store, so only the state store's retention 
matters. This is what you're setting with the grace period.

I hope this helps!
-John

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-08 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835832#comment-16835832
 ] 

John Roesler edited comment on KAFKA-8315 at 5/8/19 7:17 PM:
-

Hi [~the4thamigo_uk], Sorry for leaving you hanging a bit. I'm glad your 
investigation is progressing.

Last question first: Streams should choose to consume from the left or right 
based on which one has the lower timestamp in the next record, so I would not 
expect one side to "run ahead" of the other. There's one caveat, that when one 
side is being produced more slowly, Streams won't just wait indefinitely for 
the next data, but instead just process the side that does have data. This is 
controled by the "max idle ms" config, but since you're processing 
historically, this shouldn't be your problem. Still might be worth a look.

Maybe for debugging purposes, you can print out the key, value, and timestamp 
for each of the sides as well as in the joiner, so you can identify which side 
is triggering the join, and evaluate whether or not it's correctly time-ordered.

If it is in fact running ahead on one side, despite what it should be doing, 
this would explain why you see better results with a larger grace period. To 
confirm, the grace period should only matter up to the maximum time skew in 
your stream. So, as you said, if you have two producers that each produce a 
full 24 hours of data, sequentially, then you should see stream time advance 
when the first producer writes its data, and then "freeze" while the second 
producer writes its (out-of-order) data. Thus, you'll want to set the grace 
period to keep old windows around for at least 24 hours, since you know you 
have to wait for that second producer's data.

Finally, to answer your earlier questions, yes, each task is handling just one 
partition of both input topics (the same partition on the left and right). 
Stream Time is independently maintained for each task/partition, and it is 
computed simply as the highest timestamp yet observed for that partition. If 
you want to look at it in detail, it's tracked in 
org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore . 
Actually, you can set that class's logger to DEBUG mode and it'll print out 
every time it skips a record that is outside of retention.

Minor point, you should not need to mess with the retention of the changelog 
topic. Streams sets this appropriately to preserve the same data as the store, 
but this is only apparent when restoring the store. The actual results of the 
join are served out of the state store, so only the state store's retention 
matters. This is what you're setting with the grace period.

I hope this helps!
-John


was (Author: vvcephei):
Hi [~the4thamigo_uk], Sorry for leaving you hanging a bit. I'm glad your 
investigation is progressing.

Last question first: Sterams should choose to consume from the left or right 
based on which one has the lower timestamp in the next record, so I would not 
expect one side to "run ahead" of the other. There's one caveat, that when one 
side is being produced more slowly, Streams won't just wait indefinitely for 
the next data, but instead just process the side that does have data. This is 
controled by the "max idle ms" config, but since you're processing 
historically, this shouldn't be your problem. Still might be worth a look.

Maybe for debugging purposes, you can print out the key, value, and timestamp 
for each of the sides as well as in the joiner, so you can identify which side 
is triggering the join, and evaluate whether or not it's correctly time-ordered.

If it is in fact running ahead on one side, despite what it should be doing, 
this would explain why you see better results with a larger grace period. To 
confirm, the grace period should only matter up to the maximum time skew in 
your stream. So, as you said, if you have two producers that each produce a 
full 24 hours of data, sequentially, then you should see stream time advance 
when the first producer writes its data, and then "freeze" while the second 
producer writes its (out-of-order) data. Thus, you'll want to set the grace 
period to keep old windows around for at least 24 hours, since you know you 
have to wait for that second producer's data.

Finally, to answer your earlier questions, yes, each task is handling just one 
partition of both input topics (the same partition on the left and right). 
Stream Time is independently maintained for each task/partition, and it is 
computed simply as the highest timestamp yet observed for that partition. If 
you want to look at it in detail, it's tracked in 
org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore . 
Actually, you can set that class's logger to DEBUG mode and it'll print out 
every time it skips a record that is outside of retention.

Minor point, you should not need to 

[jira] [Resolved] (KAFKA-8051) remove KafkaMbean when network close

2019-05-08 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8051?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-8051.

Resolution: Duplicate

> remove KafkaMbean when network close
> 
>
> Key: KAFKA-8051
> URL: https://issues.apache.org/jira/browse/KAFKA-8051
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, core
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: limeng
>Priority: Critical
> Fix For: 2.2.2
>
>
> the  broker server will be oom when 
>  * a large number of clients frequently close and reconnect
>  * the clientId changes every time when reconnect,that gives rise to too much 
> kafkaMbean in broker
> the reason is that broker forget to remove kafkaMbean when detect connection 
> closes.
> h2.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8049) remove KafkaMbean when network close

2019-05-08 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-8049.

Resolution: Duplicate

> remove KafkaMbean when network close
> 
>
> Key: KAFKA-8049
> URL: https://issues.apache.org/jira/browse/KAFKA-8049
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, core
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: limeng
>Priority: Critical
> Fix For: 2.2.2
>
>
> the  broker server will be oom when 
>  * a large number of clients frequently close and reconnect
>  * the clientId changes every time when reconnect,that gives rise to too much 
> kafkaMbean in broker
> the reason is that broker forget to remove kafkaMbean when detect connection 
> closes.
> h2.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8048) remove KafkaMbean when network close

2019-05-08 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-8048.

Resolution: Duplicate

> remove KafkaMbean when network close
> 
>
> Key: KAFKA-8048
> URL: https://issues.apache.org/jira/browse/KAFKA-8048
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, core
>Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2
>Reporter: limeng
>Priority: Critical
> Fix For: 2.2.2
>
>
> the  broker server will be oom when 
>  * a large number of clients frequently close and reconnect
>  * the clientId changes every time when reconnect,that gives rise to too much 
> kafkaMbean in broker
> the reason is that broker forget to remove kafkaMbean when detect connection 
> closes.
> h2.  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8338) Improve consumer offset expiration logic to take subscription into account

2019-05-08 Thread Gwen Shapira (JIRA)
Gwen Shapira created KAFKA-8338:
---

 Summary: Improve consumer offset expiration logic to take 
subscription into account
 Key: KAFKA-8338
 URL: https://issues.apache.org/jira/browse/KAFKA-8338
 Project: Kafka
  Issue Type: Improvement
Reporter: Gwen Shapira


Currently, we expire consumer offsets for a group after the group is considered 
gone.

There is a case where the consumer group still exists, but is now subscribed to 
different topics. In that case, the offsets of the old topics will never expire 
and if lag is monitored, the monitors will show ever-growing lag on those 
topics. 

We need to improve the logic to expire the consumer offsets if the consumer 
group didn't subscribe to specific topics/partitions for enough time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8326) Add List Serde

2019-05-08 Thread Daniyar Yeralin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniyar Yeralin updated KAFKA-8326:
---
Description: 
_This ticket proposes adding new ListSerializer and ListDeserializer classes as 
well as support for the new classes into the Serdes class. This will allow 
using List Serde of type T directly from Consumers, Producers and Streams._

_List serialization and deserialization will be done through repeatedly 
calling a serializer/deserializer for each entry provided by passed generic T's 
Serde. For example, if you want to create List of Strings serde, then 
serializer/deserializer of StringSerde will be used to serialize/deserialize 
each entry in `List`._

I believe there are many use cases where List Serde could be used. Ex. 
[https://stackoverflow.com/questions/41427174/aggregate-java-objects-in-a-list-with-kafka-streams-dsl-windows],
 
[https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api]

For instance, aggregate grouped (by key) values together in a list to do other 
subsequent operations on the collection.

KIP Link: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List%3CT%3E+serialization+and+deserialization]

  was:
I propose _adding new ListSerializer and ListDeserializer classes as well as 
support for the new classes into the Serdes class. This will allow using 
List Serde directly from Consumers, Producers and Streams._

_List serialization and deserialization will be done through repeatedly 
calling a serializer/deserializer for each entry provided by passed generic T's 
Serde._

_For example, if you want to create List of Strings serde, you will have to 
declare `new ListSerde<>(Serdes.String())`, in this case 
serializer/deserializer of String Serde will be used to serialize/deserialize 
each entry in `List`._

I believe there are many use cases where List Serde could be used. Ex. 
[https://stackoverflow.com/questions/41427174/aggregate-java-objects-in-a-list-with-kafka-streams-dsl-windows],
 
[https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api]

For instance, aggregate grouped (by key) values together in a list to do other 
subsequent operations on the collection.

 

KIP Link: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List%3CT%3E+serialization+and+deserialization]


> Add List Serde
> -
>
> Key: KAFKA-8326
> URL: https://issues.apache.org/jira/browse/KAFKA-8326
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Reporter: Daniyar Yeralin
>Assignee: Daniyar Yeralin
>Priority: Minor
>  Labels: kip
>
> _This ticket proposes adding new ListSerializer and ListDeserializer classes 
> as well as support for the new classes into the Serdes class. This will allow 
> using List Serde of type T directly from Consumers, Producers and Streams._
> _List serialization and deserialization will be done through repeatedly 
> calling a serializer/deserializer for each entry provided by passed generic 
> T's Serde. For example, if you want to create List of Strings serde, then 
> serializer/deserializer of StringSerde will be used to serialize/deserialize 
> each entry in `List`._
> I believe there are many use cases where List Serde could be used. Ex. 
> [https://stackoverflow.com/questions/41427174/aggregate-java-objects-in-a-list-with-kafka-streams-dsl-windows],
>  
> [https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api]
> For instance, aggregate grouped (by key) values together in a list to do 
> other subsequent operations on the collection.
> KIP Link: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List%3CT%3E+serialization+and+deserialization]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-08 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835784#comment-16835784
 ] 

Andrew edited comment on KAFKA-8315 at 5/8/19 5:54 PM:
---

After a lot of investigation, we think this issue is down to the fact that we 
have a left stream with minute-by-minute data and a right topic with daily data.

It is not clear what logic controls the rate at which records are read from 
left and right streams, but we believe that the right topic is being read at a 
rate such that that it quickly gets too far ahead of the left stream (in terms 
of event-time), as there are far fewer records, and therefore the right stream 
windows are being expired before the left stream data has been read.

[~vvcephei] What controls the rate that records are read from the left and 
right streams? Is there any guarantee that the timestamps for the records in 
the left and right streams are kept more-or-less in line with the records from 
the right stream?

If not, is there any way we can somehow delay the right stream?

 

Thanks for your help above.

 


was (Author: the4thamigo_uk):
After a lot of investigation, we think this issue is down to the fact that we 
have a left stream with minute-by-minute data and a right topic with daily data.

It is not clear what logic controls the rate at which records are read from 
left and right streams, but we believe that the right topic is being read at a 
rate such that that it quickly gets too far ahead of the left stream (in terms 
of event-time) and therefore the right stream windows are being expired before 
the left stream data has been read.

[~vvcephei] What controls the rate that records are read from the left and 
right streams? Is there any guarantee that the timestamps for the records in 
the left and right streams are kept more-or-less in line with the records from 
the right stream?

If not, is there any way we can somehow delay the right stream?

 

Thanks for your help above.

 

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-08 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835784#comment-16835784
 ] 

Andrew commented on KAFKA-8315:
---

After a lot of investigation, we think this issue is down to the fact that we 
have a left stream with minute-by-minute data and a right topic with daily data.

It is not clear what logic controls the rate at which records are read from 
left and right streams, but we believe that the right topic is being read at a 
rate such that that it quickly gets too far ahead of the left stream (in terms 
of event-time) and therefore the right stream windows are being expired before 
the left stream data has been read.

[~vvcephei] What controls the rate that records are read from the left and 
right streams? Is there any guarantee that the timestamps for the records in 
the left and right streams are kept more-or-less in line with the records from 
the right stream?

If not, is there any way we can somehow delay the right stream?

 

Thanks for your help above.

 

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8326) Add List Serde

2019-05-08 Thread Daniyar Yeralin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniyar Yeralin updated KAFKA-8326:
---
Description: 
I propose _adding new ListSerializer and ListDeserializer classes as well as 
support for the new classes into the Serdes class. This will allow using 
List Serde directly from Consumers, Producers and Streams._

_List serialization and deserialization will be done through repeatedly 
calling a serializer/deserializer for each entry provided by passed generic T's 
Serde._

_For example, if you want to create List of Strings serde, you will have to 
declare `new ListSerde<>(Serdes.String())`, in this case 
serializer/deserializer of String Serde will be used to serialize/deserialize 
each entry in `List`._

I believe there are many use cases where List Serde could be used. Ex. 
[https://stackoverflow.com/questions/41427174/aggregate-java-objects-in-a-list-with-kafka-streams-dsl-windows],
 
[https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api]

For instance, aggregate grouped (by key) values together in a list to do other 
subsequent operations on the collection.

 

KIP Link: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List%3CT%3E+serialization+and+deserialization]

  was:
I propose _adding new ListSerializer and ListDeserializer classes as well as 
support for the new classes into the Serdes class. This will allow using 
List Serde directly from Consumers, Producers and Streams._

_List serialization and deserialization will be done through repeatedly 
calling a serializer/deserializer for each entry provided by passed generic T's 
Serde._ 

_For example, if you want to create List of Strings serde, you will have to 
declare `new Serdes.ListSerde<>(Serdes.String(), 
Comparator.comparing(String::length))`, in this case serializer/deserializer of 
String Serde will be used to serialize/deserialize each entry in 
`List`._

I believe there are many use cases where List Serde could be used. Ex. 
[https://stackoverflow.com/questions/41427174/aggregate-java-objects-in-a-list-with-kafka-streams-dsl-windows],
 
[https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api]

For instance, aggregate grouped (by key) values together in a list to do other 
subsequent operations on the collection.

 

KIP Link: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List%3CT%3E+serialization+and+deserialization]


> Add List Serde
> -
>
> Key: KAFKA-8326
> URL: https://issues.apache.org/jira/browse/KAFKA-8326
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Reporter: Daniyar Yeralin
>Assignee: Daniyar Yeralin
>Priority: Minor
>  Labels: kip
>
> I propose _adding new ListSerializer and ListDeserializer classes as well as 
> support for the new classes into the Serdes class. This will allow using 
> List Serde directly from Consumers, Producers and Streams._
> _List serialization and deserialization will be done through repeatedly 
> calling a serializer/deserializer for each entry provided by passed generic 
> T's Serde._
> _For example, if you want to create List of Strings serde, you will have to 
> declare `new ListSerde<>(Serdes.String())`, in this case 
> serializer/deserializer of String Serde will be used to serialize/deserialize 
> each entry in `List`._
> I believe there are many use cases where List Serde could be used. Ex. 
> [https://stackoverflow.com/questions/41427174/aggregate-java-objects-in-a-list-with-kafka-streams-dsl-windows],
>  
> [https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api]
> For instance, aggregate grouped (by key) values together in a list to do 
> other subsequent operations on the collection.
>  
> KIP Link: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List%3CT%3E+serialization+and+deserialization]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7320) Provide ability to disable auto topic creation in KafkaConsumer

2019-05-08 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-7320.

   Resolution: Fixed
Fix Version/s: 2.3.0

> Provide ability to disable auto topic creation in KafkaConsumer
> ---
>
> Key: KAFKA-7320
> URL: https://issues.apache.org/jira/browse/KAFKA-7320
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Major
> Fix For: 2.3.0
>
>
> Consumers should have a configuration to control whether subscribing to 
> non-existent topics should automatically create the topic or not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7320) Provide ability to disable auto topic creation in KafkaConsumer

2019-05-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835742#comment-16835742
 ] 

ASF GitHub Bot commented on KAFKA-7320:
---

hachikuji commented on pull request #5542: KAFKA-7320: Add consumer 
configuration to disable auto topic creation
URL: https://github.com/apache/kafka/pull/5542
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Provide ability to disable auto topic creation in KafkaConsumer
> ---
>
> Key: KAFKA-7320
> URL: https://issues.apache.org/jira/browse/KAFKA-7320
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Major
>
> Consumers should have a configuration to control whether subscribing to 
> non-existent topics should automatically create the topic or not.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets

2019-05-08 Thread Jason Gustafson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835720#comment-16835720
 ] 

Jason Gustafson commented on KAFKA-8335:


Thanks for the report. Can you provide your broker configuration? COMMIT 
markers should be cleaned as soon as all the data from the transaction has also 
been deleted. It can be delayed by as long as `delete.retention.ms` though. It 
might be helpful if you provide the full dump of the segment.

> Log cleaner skips Transactional mark and batch record, causing unlimited 
> growth of __consumer_offsets
> -
>
> Key: KAFKA-8335
> URL: https://issues.apache.org/jira/browse/KAFKA-8335
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.0
>Reporter: Boquan Tang
>Priority: Major
>
> My Colleague Weichu already sent out a mail to kafka user mailing list 
> regarding this issue, but we think it's worth having a ticket tracking it.
> We are using Kafka Streams with exactly-once enabled on a Kafka cluster for
> a while.
> Recently we found that the size of __consumer_offsets partitions grew huge.
> Some partition went over 30G. This caused Kafka to take quite long to load
> "__consumer_offsets" topic on startup (it loads the topic in order to
> become group coordinator).
> We dumped the __consumer_offsets segments and found that while normal
> offset commits are nicely compacted, transaction records (COMMIT, etc) are
> all preserved. Looks like that since these messages don't have a key, the
> LogCleaner is keeping them all:
> --
> $ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
> /003484332061.log --key-decoder-class
> kafka.serializer.StringDecoder 2>/dev/null | cat -v | head
> Dumping 003484332061.log
> Starting offset: 3484332061
> offset: 3484332089 position: 549 CreateTime: 1556003706952 isvalid: true
> keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 1006
> producerEpoch: 2530 sequence: -1 isTransactional: true headerKeys: []
> endTxnMarker: COMMIT coordinatorEpoch: 81
> offset: 3484332090 position: 627 CreateTime: 1556003706952 isvalid: true
> keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 4005
> producerEpoch: 2520 sequence: -1 isTransactional: true headerKeys: []
> endTxnMarker: COMMIT coordinatorEpoch: 84
> ...
> --
> Streams is doing transaction commits per 100ms (commit.interval.ms=100 when
> exactly-once) so the __consumer_offsets is growing really fast.
> Is this (to keep all transactions) by design, or is that a bug for
> LogCleaner?  What would be the way to clean up the topic?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8337) Fix tests/setup.cfg to work with pytest 4.x

2019-05-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8337?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835682#comment-16835682
 ] 

ASF GitHub Bot commented on KAFKA-8337:
---

sekikn commented on pull request #6701: KAFKA-8337: Fix tests/setup.cfg to work 
with pytest 4.x
URL: https://github.com/apache/kafka/pull/6701
 
 
   This PR replaces [pytest] section in tests/setup.cfg with [tool:pytest] so 
that the unit tests for the system tests work with pytest 4.x.
   I ran `python setup.py test` and confirmed it succeeded.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix tests/setup.cfg to work with pytest 4.x
> ---
>
> Key: KAFKA-8337
> URL: https://issues.apache.org/jira/browse/KAFKA-8337
> Project: Kafka
>  Issue Type: Bug
>  Components: system tests, unit tests
>Reporter: Kengo Seki
>Priority: Minor
>
> In accordance with tests/README.md, I ran {{`python setup.py test`}} in the 
> {{tests}} directory to execute unit tests for the system tests, but got the 
> following error:
> {code}
> $ python setup.py test
> running test
> (snip)
> Using /home/sekikn/repo/kafka/tests/.eggs/docutils-0.14-py2.7.egg
> Searching for pytest
> Best match: pytest 4.4.1
> Processing pytest-4.4.1-py2.7.egg
> (snip)
> builtins.Failed: [pytest] section in setup.cfg files is no longer supported, 
> change to [tool:pytest] instead.
> {code}
> This is because [\[pytest\] section in setup.cfg has been removed in pytest 
> 4.0|https://docs.pytest.org/en/4.1.0/deprecations.html#pytest-section-in-setup-cfg-files].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8337) Fix tests/setup.cfg to work with pytest 4.x

2019-05-08 Thread Kengo Seki (JIRA)
Kengo Seki created KAFKA-8337:
-

 Summary: Fix tests/setup.cfg to work with pytest 4.x
 Key: KAFKA-8337
 URL: https://issues.apache.org/jira/browse/KAFKA-8337
 Project: Kafka
  Issue Type: Bug
  Components: system tests, unit tests
Reporter: Kengo Seki


In accordance with tests/README.md, I ran {{`python setup.py test`}} in the 
{{tests}} directory to execute unit tests for the system tests, but got the 
following error:

{code}
$ python setup.py test
running test

(snip)

Using /home/sekikn/repo/kafka/tests/.eggs/docutils-0.14-py2.7.egg
Searching for pytest
Best match: pytest 4.4.1
Processing pytest-4.4.1-py2.7.egg

(snip)

builtins.Failed: [pytest] section in setup.cfg files is no longer supported, 
change to [tool:pytest] instead.

{code}

This is because [\[pytest\] section in setup.cfg has been removed in pytest 
4.0|https://docs.pytest.org/en/4.1.0/deprecations.html#pytest-section-in-setup-cfg-files].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7847) KIP-421: Automatically resolve external configurations.

2019-05-08 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram updated KAFKA-7847:
--
Fix Version/s: 2.3.0

> KIP-421: Automatically resolve external configurations.
> ---
>
> Key: KAFKA-7847
> URL: https://issues.apache.org/jira/browse/KAFKA-7847
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Reporter: TEJAL ADSUL
>Priority: Minor
> Fix For: 2.3.0
>
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> This proposal intends to enhance the AbstractConfig base class to support 
> replacing variables in configurations just prior to parsing and validation. 
> This simple change will make it very easy for client applications, Kafka 
> Connect, and Kafka Streams to use shared code to easily incorporate 
> externalized secrets and other variable replacements within their 
> configurations. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7757) Too many open files after java.io.IOException: Connection to n was disconnected before the response was read

2019-05-08 Thread arthur (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7757?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835621#comment-16835621
 ] 

arthur commented on KAFKA-7757:
---

Hello, we are also facing the same issue running kafka 2.2.0:

ls /proc/61020/fd | wc -l
165651

kafka-topics --version
2.2.0-cp2 (Commit:325e9879cbc6d612)

link to thread dump: [^dump.txt]

> Too many open files after java.io.IOException: Connection to n was 
> disconnected before the response was read
> 
>
> Key: KAFKA-7757
> URL: https://issues.apache.org/jira/browse/KAFKA-7757
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Pedro Gontijo
>Priority: Major
> Attachments: Screen Shot 2019-01-03 at 12.20.38 PM.png, dump.txt, 
> fd-spike-threads.txt, kafka-allocated-file-handles.png, server.properties, 
> td1.txt, td2.txt, td3.txt
>
>
> We upgraded from 0.10.2.2 to 2.1.0 (a cluster with 3 brokers)
> After a while (hours) 2 brokers start to throw:
> {code:java}
> java.io.IOException: Connection to NN was disconnected before the response 
> was read
> at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)
> at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97)
> at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190)
> at 
> kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
> at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> File descriptors start to pile up and if I do not restart it throws "Too many 
> open files" and crashes.  
> {code:java}
> ERROR Error while accepting connection (kafka.network.Acceptor)
> java.io.IOException: Too many open files in system
> at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
> at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
> at kafka.network.Acceptor.accept(SocketServer.scala:460)
> at kafka.network.Acceptor.run(SocketServer.scala:403)
> at java.lang.Thread.run(Thread.java:748)
> {code}
>  
>  After some hours the issue happens again... It has happened with all 
> brokers, so it is not something specific to an instance.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7757) Too many open files after java.io.IOException: Connection to n was disconnected before the response was read

2019-05-08 Thread arthur (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7757?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

arthur updated KAFKA-7757:
--
Attachment: dump.txt

> Too many open files after java.io.IOException: Connection to n was 
> disconnected before the response was read
> 
>
> Key: KAFKA-7757
> URL: https://issues.apache.org/jira/browse/KAFKA-7757
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Pedro Gontijo
>Priority: Major
> Attachments: Screen Shot 2019-01-03 at 12.20.38 PM.png, dump.txt, 
> fd-spike-threads.txt, kafka-allocated-file-handles.png, server.properties, 
> td1.txt, td2.txt, td3.txt
>
>
> We upgraded from 0.10.2.2 to 2.1.0 (a cluster with 3 brokers)
> After a while (hours) 2 brokers start to throw:
> {code:java}
> java.io.IOException: Connection to NN was disconnected before the response 
> was read
> at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)
> at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97)
> at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190)
> at 
> kafka.server.AbstractFetcherThread.kafka$server$AbstractFetcherThread$$processFetchRequest(AbstractFetcherThread.scala:241)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:130)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$maybeFetch$1.apply(AbstractFetcherThread.scala:129)
> at scala.Option.foreach(Option.scala:257)
> at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
> at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> File descriptors start to pile up and if I do not restart it throws "Too many 
> open files" and crashes.  
> {code:java}
> ERROR Error while accepting connection (kafka.network.Acceptor)
> java.io.IOException: Too many open files in system
> at sun.nio.ch.ServerSocketChannelImpl.accept0(Native Method)
> at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:422)
> at sun.nio.ch.ServerSocketChannelImpl.accept(ServerSocketChannelImpl.java:250)
> at kafka.network.Acceptor.accept(SocketServer.scala:460)
> at kafka.network.Acceptor.run(SocketServer.scala:403)
> at java.lang.Thread.run(Thread.java:748)
> {code}
>  
>  After some hours the issue happens again... It has happened with all 
> brokers, so it is not something specific to an instance.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8195) Unstable Producer After Send Failure in Transaction

2019-05-08 Thread Viktor Somogyi-Vass (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835601#comment-16835601
 ] 

Viktor Somogyi-Vass edited comment on KAFKA-8195 at 5/8/19 1:30 PM:


Tried to reproduce the second flavor with your code and from the debug level 
logs it seems like the first transaction with won't be finished and starting 
the attempts of the second producer would be interpreted as concurrent 
transactions:
{noformat}
[2019-05-08 15:19:16,293] DEBUG Completed 
request:RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=1, 
clientId=producer-3, correlationId=1725) -- 
{transactional_id=txd-,transaction_timeout_ms=6},response:{throttle_time_ms=0,error_code=51,producer_id=-1,producer_epoch=-1}
 from connection 
172.30.64.130:9091-172.30.64.130:58192-14;totalTime:0.241,requestQueueTime:0.06,localTime:0.085,remoteTime:0.0,throttleTime:0.035,responseQueueTime:0.052,sendTime:0.056,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
 (kafka.request.logger)
{noformat}
This causes the client to reeenqueue the InitProducerId request until it times 
out. I guess the best would be to detect this situation in the brokers and 
remove that glitched "transaction" but I need to look into this more.


was (Author: viktorsomogyi):
Tried to reproduce the second flavor with your code and from the debug level 
logs it seems like the first transaction with won't be finished and starting 
the attempts of the second producer would be interpreted as concurrent 
transactions:
{format}
[2019-05-08 15:19:16,293] DEBUG Completed 
request:RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=1, 
clientId=producer-3, correlationId=1725) -- 
{transactional_id=txd-,transaction_timeout_ms=6},response:{throttle_time_ms=0,error_code=51,producer_id=-1,producer_epoch=-1}
 from connection 
172.30.64.130:9091-172.30.64.130:58192-14;totalTime:0.241,requestQueueTime:0.06,localTime:0.085,remoteTime:0.0,throttleTime:0.035,responseQueueTime:0.052,sendTime:0.056,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
 (kafka.request.logger)
{format}
This causes the client to reeenqueue the InitProducerId request until it times 
out. I guess the best would be to detect this situation in the brokers and 
remove that glitched "transaction" but I need to look into this more.

> Unstable Producer After Send Failure in Transaction
> ---
>
> Key: KAFKA-8195
> URL: https://issues.apache.org/jira/browse/KAFKA-8195
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.0.1, 2.2.0
>Reporter: Gary Russell
>Priority: Blocker
>
> This journey started with [this Stack Overflow question | 
> https://stackoverflow.com/questions/55510898].
> I easily reproduced his issue (see my comments on his question).
> My first step was to take Spring out of the picture and replicate the issue 
> with the native {{Producer}} apis. The following code shows the result; I 
> have attached logs and stack traces.
> There are four methods in the test; the first performs 2 transactions, 
> successfully, on the same {{Producer}} instance.
> The second aborts 2 transactions, successfully, on the same {{Producer}} 
> instance - application level failures after performing a send.
> There are two flavors of the problem:
> The third attempts to send 2 messages, on the same {{Producer}} that are too 
> large for the topic; the first aborts as expected; the second send hangs in 
> {{abortTransaction}} after getting a {{TimeoutException}} when attempting to 
> {{get}} the send metadata. See log {{hang.same.producer.log}} - it also 
> includes the stack trace showing the hang.
> The fourth test is similar to the third but it closes the producer after the 
> first failure; this time, we timeout in {{initTransactions()}}.
> Subsequent executions of this test get the timeout on the first attempt - 
> that {{transactional.id}} seems to be unusable. Removing the logs was one way 
> I found to get past the problem.
> Test code
> {code:java}
>   public ApplicationRunner runner(AdminClient client, 
> DefaultKafkaProducerFactory pf) {
>   return args -> {
>   try {
>   Map configs = new 
> HashMap<>(pf.getConfigurationProperties());
>   int committed = testGoodTx(client, configs);
>   System.out.println("Successes (same producer) 
> committed: " + committed);
>   int rolledBack = testAppFailureTx(client, 
> configs);
>   System.out.println("App failures (same 
> producer) rolled back: " + rolledBack);
>   
>   // first flavor - hung thread in 
> 

[jira] [Updated] (KAFKA-8195) Unstable Producer After Send Failure in Transaction

2019-05-08 Thread Viktor Somogyi-Vass (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi-Vass updated KAFKA-8195:
---
Affects Version/s: 2.3.0

> Unstable Producer After Send Failure in Transaction
> ---
>
> Key: KAFKA-8195
> URL: https://issues.apache.org/jira/browse/KAFKA-8195
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.0.1, 2.2.0, 2.3.0
>Reporter: Gary Russell
>Assignee: Viktor Somogyi-Vass
>Priority: Blocker
>
> This journey started with [this Stack Overflow question | 
> https://stackoverflow.com/questions/55510898].
> I easily reproduced his issue (see my comments on his question).
> My first step was to take Spring out of the picture and replicate the issue 
> with the native {{Producer}} apis. The following code shows the result; I 
> have attached logs and stack traces.
> There are four methods in the test; the first performs 2 transactions, 
> successfully, on the same {{Producer}} instance.
> The second aborts 2 transactions, successfully, on the same {{Producer}} 
> instance - application level failures after performing a send.
> There are two flavors of the problem:
> The third attempts to send 2 messages, on the same {{Producer}} that are too 
> large for the topic; the first aborts as expected; the second send hangs in 
> {{abortTransaction}} after getting a {{TimeoutException}} when attempting to 
> {{get}} the send metadata. See log {{hang.same.producer.log}} - it also 
> includes the stack trace showing the hang.
> The fourth test is similar to the third but it closes the producer after the 
> first failure; this time, we timeout in {{initTransactions()}}.
> Subsequent executions of this test get the timeout on the first attempt - 
> that {{transactional.id}} seems to be unusable. Removing the logs was one way 
> I found to get past the problem.
> Test code
> {code:java}
>   public ApplicationRunner runner(AdminClient client, 
> DefaultKafkaProducerFactory pf) {
>   return args -> {
>   try {
>   Map configs = new 
> HashMap<>(pf.getConfigurationProperties());
>   int committed = testGoodTx(client, configs);
>   System.out.println("Successes (same producer) 
> committed: " + committed);
>   int rolledBack = testAppFailureTx(client, 
> configs);
>   System.out.println("App failures (same 
> producer) rolled back: " + rolledBack);
>   
>   // first flavor - hung thread in 
> abortTransaction()
> //rolledBack = 
> testSendFailureTxSameProducer(client, configs);
> //System.out.println("Send failures (same 
> producer) rolled back: " + rolledBack);
>   
>   // second flavor - timeout in initTransactions()
>   rolledBack = 
> testSendFailureTxNewProducer(client, configs);
>   System.out.println("Send failures (new 
> producer) rolled back: " + rolledBack);
>   }
>   catch (Exception e) {
>   e.printStackTrace();
>   }
>   };
>   }
>   private int testGoodTx(AdminClient client, Map configs)
>   throws ExecutionException {
>   int commits = 0;
>   NewTopic topic = TopicBuilder.name("so55510898a")
>   .partitions(1)
>   .replicas(1)
>   .build();
>   createTopic(client, topic);
>   configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txa-");
>   Producer producer = new 
> KafkaProducer<>(configs);
>   try {
>   producer.initTransactions();
>   for (int i = 0; i < 2; i++) {
>   producer.beginTransaction();
>   RecordMetadata recordMetadata = producer.send(
>   new 
> ProducerRecord<>("so55510898a", "foo")).get(10, 
> TimeUnit.SECONDS);
>   System.out.println(recordMetadata);
>   producer.commitTransaction();
>   commits++;
>   }
>   }
>   catch (ProducerFencedException | OutOfOrderSequenceException | 
> AuthorizationException e) {
>   // We can't recover from these exceptions, so our only 
> option is to close the producer and exit.
>   }

[jira] [Assigned] (KAFKA-8195) Unstable Producer After Send Failure in Transaction

2019-05-08 Thread Viktor Somogyi-Vass (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi-Vass reassigned KAFKA-8195:
--

Assignee: Viktor Somogyi-Vass

> Unstable Producer After Send Failure in Transaction
> ---
>
> Key: KAFKA-8195
> URL: https://issues.apache.org/jira/browse/KAFKA-8195
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.0.1, 2.2.0
>Reporter: Gary Russell
>Assignee: Viktor Somogyi-Vass
>Priority: Blocker
>
> This journey started with [this Stack Overflow question | 
> https://stackoverflow.com/questions/55510898].
> I easily reproduced his issue (see my comments on his question).
> My first step was to take Spring out of the picture and replicate the issue 
> with the native {{Producer}} apis. The following code shows the result; I 
> have attached logs and stack traces.
> There are four methods in the test; the first performs 2 transactions, 
> successfully, on the same {{Producer}} instance.
> The second aborts 2 transactions, successfully, on the same {{Producer}} 
> instance - application level failures after performing a send.
> There are two flavors of the problem:
> The third attempts to send 2 messages, on the same {{Producer}} that are too 
> large for the topic; the first aborts as expected; the second send hangs in 
> {{abortTransaction}} after getting a {{TimeoutException}} when attempting to 
> {{get}} the send metadata. See log {{hang.same.producer.log}} - it also 
> includes the stack trace showing the hang.
> The fourth test is similar to the third but it closes the producer after the 
> first failure; this time, we timeout in {{initTransactions()}}.
> Subsequent executions of this test get the timeout on the first attempt - 
> that {{transactional.id}} seems to be unusable. Removing the logs was one way 
> I found to get past the problem.
> Test code
> {code:java}
>   public ApplicationRunner runner(AdminClient client, 
> DefaultKafkaProducerFactory pf) {
>   return args -> {
>   try {
>   Map configs = new 
> HashMap<>(pf.getConfigurationProperties());
>   int committed = testGoodTx(client, configs);
>   System.out.println("Successes (same producer) 
> committed: " + committed);
>   int rolledBack = testAppFailureTx(client, 
> configs);
>   System.out.println("App failures (same 
> producer) rolled back: " + rolledBack);
>   
>   // first flavor - hung thread in 
> abortTransaction()
> //rolledBack = 
> testSendFailureTxSameProducer(client, configs);
> //System.out.println("Send failures (same 
> producer) rolled back: " + rolledBack);
>   
>   // second flavor - timeout in initTransactions()
>   rolledBack = 
> testSendFailureTxNewProducer(client, configs);
>   System.out.println("Send failures (new 
> producer) rolled back: " + rolledBack);
>   }
>   catch (Exception e) {
>   e.printStackTrace();
>   }
>   };
>   }
>   private int testGoodTx(AdminClient client, Map configs)
>   throws ExecutionException {
>   int commits = 0;
>   NewTopic topic = TopicBuilder.name("so55510898a")
>   .partitions(1)
>   .replicas(1)
>   .build();
>   createTopic(client, topic);
>   configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "txa-");
>   Producer producer = new 
> KafkaProducer<>(configs);
>   try {
>   producer.initTransactions();
>   for (int i = 0; i < 2; i++) {
>   producer.beginTransaction();
>   RecordMetadata recordMetadata = producer.send(
>   new 
> ProducerRecord<>("so55510898a", "foo")).get(10, 
> TimeUnit.SECONDS);
>   System.out.println(recordMetadata);
>   producer.commitTransaction();
>   commits++;
>   }
>   }
>   catch (ProducerFencedException | OutOfOrderSequenceException | 
> AuthorizationException e) {
>   // We can't recover from these exceptions, so our only 
> option is to close the producer and exit.
> 

[jira] [Commented] (KAFKA-8195) Unstable Producer After Send Failure in Transaction

2019-05-08 Thread Viktor Somogyi-Vass (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835601#comment-16835601
 ] 

Viktor Somogyi-Vass commented on KAFKA-8195:


Tried to reproduce the second flavor with your code and from the debug level 
logs it seems like the first transaction with won't be finished and starting 
the attempts of the second producer would be interpreted as concurrent 
transactions:
{format}
[2019-05-08 15:19:16,293] DEBUG Completed 
request:RequestHeader(apiKey=INIT_PRODUCER_ID, apiVersion=1, 
clientId=producer-3, correlationId=1725) -- 
{transactional_id=txd-,transaction_timeout_ms=6},response:{throttle_time_ms=0,error_code=51,producer_id=-1,producer_epoch=-1}
 from connection 
172.30.64.130:9091-172.30.64.130:58192-14;totalTime:0.241,requestQueueTime:0.06,localTime:0.085,remoteTime:0.0,throttleTime:0.035,responseQueueTime:0.052,sendTime:0.056,securityProtocol:PLAINTEXT,principal:User:ANONYMOUS,listener:PLAINTEXT
 (kafka.request.logger)
{format}
This causes the client to reeenqueue the InitProducerId request until it times 
out. I guess the best would be to detect this situation in the brokers and 
remove that glitched "transaction" but I need to look into this more.

> Unstable Producer After Send Failure in Transaction
> ---
>
> Key: KAFKA-8195
> URL: https://issues.apache.org/jira/browse/KAFKA-8195
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.0.1, 2.2.0
>Reporter: Gary Russell
>Priority: Blocker
>
> This journey started with [this Stack Overflow question | 
> https://stackoverflow.com/questions/55510898].
> I easily reproduced his issue (see my comments on his question).
> My first step was to take Spring out of the picture and replicate the issue 
> with the native {{Producer}} apis. The following code shows the result; I 
> have attached logs and stack traces.
> There are four methods in the test; the first performs 2 transactions, 
> successfully, on the same {{Producer}} instance.
> The second aborts 2 transactions, successfully, on the same {{Producer}} 
> instance - application level failures after performing a send.
> There are two flavors of the problem:
> The third attempts to send 2 messages, on the same {{Producer}} that are too 
> large for the topic; the first aborts as expected; the second send hangs in 
> {{abortTransaction}} after getting a {{TimeoutException}} when attempting to 
> {{get}} the send metadata. See log {{hang.same.producer.log}} - it also 
> includes the stack trace showing the hang.
> The fourth test is similar to the third but it closes the producer after the 
> first failure; this time, we timeout in {{initTransactions()}}.
> Subsequent executions of this test get the timeout on the first attempt - 
> that {{transactional.id}} seems to be unusable. Removing the logs was one way 
> I found to get past the problem.
> Test code
> {code:java}
>   public ApplicationRunner runner(AdminClient client, 
> DefaultKafkaProducerFactory pf) {
>   return args -> {
>   try {
>   Map configs = new 
> HashMap<>(pf.getConfigurationProperties());
>   int committed = testGoodTx(client, configs);
>   System.out.println("Successes (same producer) 
> committed: " + committed);
>   int rolledBack = testAppFailureTx(client, 
> configs);
>   System.out.println("App failures (same 
> producer) rolled back: " + rolledBack);
>   
>   // first flavor - hung thread in 
> abortTransaction()
> //rolledBack = 
> testSendFailureTxSameProducer(client, configs);
> //System.out.println("Send failures (same 
> producer) rolled back: " + rolledBack);
>   
>   // second flavor - timeout in initTransactions()
>   rolledBack = 
> testSendFailureTxNewProducer(client, configs);
>   System.out.println("Send failures (new 
> producer) rolled back: " + rolledBack);
>   }
>   catch (Exception e) {
>   e.printStackTrace();
>   }
>   };
>   }
>   private int testGoodTx(AdminClient client, Map configs)
>   throws ExecutionException {
>   int commits = 0;
>   NewTopic topic = TopicBuilder.name("so55510898a")
>   .partitions(1)
>   .replicas(1)
>   .build();
>   createTopic(client, topic);
>   

[jira] [Commented] (KAFKA-8106) Reducing the allocation and copying of ByteBuffer when logValidator do validation.

2019-05-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835563#comment-16835563
 ] 

ASF GitHub Bot commented on KAFKA-8106:
---

ijuma commented on pull request #6476: KAFKA-8106:Reducing the allocation and 
copying of ByteBuffer  when logValidator  do validation.
URL: https://github.com/apache/kafka/pull/6476
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reducing the allocation and copying of ByteBuffer  when logValidator  do 
> validation.
> 
>
> Key: KAFKA-8106
> URL: https://issues.apache.org/jira/browse/KAFKA-8106
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0, 2.1.1
> Environment: Server : 
> cpu:2*16 ; 
> MemTotal : 256G;
> Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network 
> Connection ; 
> SSD.
>Reporter: Flower.min
>Assignee: Flower.min
>Priority: Major
>  Labels: performance
>
>       We do performance testing about Kafka in specific scenarios as 
> described below .We build a kafka cluster with one broker,and create topics 
> with different number of partitions.Then we start lots of producer processes 
> to send large amounts of messages to one of the topics at one  testing .
> *_Specific Scenario_*
>   
>  *_1.Main config of Kafka_*  
>  # Main config of Kafka  
> server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/]
>  # Number of TopicPartition : 50~2000
>  # Size of Single Message : 1024B
>  
>  *_2.Config of KafkaProducer_* 
> ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory||
> |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB|
> *_3.The best result of performance testing_*  
> ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of 
> production||
> |550MB/s~610MB/s|97%~99%|550MB/s~610MB/s       |23,000,000 messages/s|
> *_4.Phenomenon and  my doubt_*
>     _The upper limit of CPU usage has been reached  But  it does not 
> reach the upper limit of the bandwidth of the server  network. *We are 
> doubtful about which  cost too much CPU time and we want to Improve  
> performance and reduces CPU usage of Kafka server.*_
> _*5.Analysis*_
>         We analysis the JFIR of Kafka server when doing performance testing 
> .After we checked and completed the performance test again, we located the 
> code "*ByteBuffer recordBuffer = 
> ByteBuffer.allocate(sizeOfBodyInBytes);*(*Class:DefaultRecord,Function:readFrom()*)”
>  which consumed CPU resources and caused a lot of GC .Our modified code 
> reduces the allocation and copying of ByteBuffer, so the test performance is 
> greatly improved, and the CPU's stable usage is *below 60%*. The following is 
> a comparison of different code test performance under the same conditions.
> *Result of performance testing*
> *Main config of Kafka: Single 
> Message:1024B;TopicPartitions:200;linger.ms:1000ms.*
> | Single Message : 1024B,|Network inflow rate|CPU(%)|Messages/s|
> |Source code|600M/s|97%|25,000,000|
> |Modified code|1GB/s|<60%|41,660,000|
> **1.Before modified code(Source code) GC:**
> ![](https://i.loli.net/2019/05/07/5cd16df163ad3.png)
> **2.After modified code(remove allocation of ByteBuffer) GC:**
> ![](https://i.loli.net/2019/05/07/5cd16dae1dbc2.png)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-1774) REPL and Shell Client for Admin Message RQ/RP

2019-05-08 Thread Viktor Somogyi-Vass (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835466#comment-16835466
 ] 

Viktor Somogyi-Vass commented on KAFKA-1774:


The POC is here: https://github.com/viktorsomogyi/kafka/tree/kafka_shell

> REPL and Shell Client for Admin Message RQ/RP
> -
>
> Key: KAFKA-1774
> URL: https://issues.apache.org/jira/browse/KAFKA-1774
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Joe Stein
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>
> We should have a REPL we can work in and execute the commands with the 
> arguments. With this we can do:
> ./kafka.sh --shell 
> kafka>attach cluster -b localhost:9092;
> kafka>describe topic sampleTopicNameForExample;
> the command line version can work like it does now so folks don't have to 
> re-write all of their tooling.
> kafka.sh --topics --everything the same like kafka-topics.sh is 
> kafka.sh --reassign --everything the same like kafka-reassign-partitions.sh 
> is 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8336) Enable dynamic update of client-side SSL factory in brokers

2019-05-08 Thread Rajini Sivaram (JIRA)
Rajini Sivaram created KAFKA-8336:
-

 Summary: Enable dynamic update of client-side SSL factory in 
brokers
 Key: KAFKA-8336
 URL: https://issues.apache.org/jira/browse/KAFKA-8336
 Project: Kafka
  Issue Type: Improvement
  Components: core
Affects Versions: 2.2.0
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 2.3.0


We currently support dynamic update of server-side keystores. This allows 
expired certs to be updated on brokers without a rolling restart. When mutual 
authentication is enabled for inter-broker-communication 
(ssl.client.auth=required), we dont currently dynamically update client-side 
keystores for controller or transaction coordinator. So a broker restart (or 
controller change) is required for cert update for this case. Since short-lived 
SSL cert is a common usecase, we should enable client-side cert updates for all 
client connections initiated by the broker to ensure that SSL certificate 
expiry can be handled with dynamic config updates on brokers for all 
configurations.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7817) Multiple Consumer Group Management with Regex

2019-05-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835410#comment-16835410
 ] 

ASF GitHub Bot commented on KAFKA-7817:
---

rootex- commented on pull request #6700: KAFKA-7817 ConsumerGroupCommand Regex 
Feature
URL: https://github.com/apache/kafka/pull/6700
 
 
   *KAFKA-7817 ConsumerGroupCommand Regex Feature
   Description: *Add ability to select a subset of consumer groups using regex 
for operations: --describe, --delete and --reset-offsets*
   JIRA: [Multiple Consumer Group Management with 
Regex](https://issues.apache.org/jira/browse/KAFKA-7817)
   Discussion 1. [Multiple Consumer Group 
Management](https://www.mail-archive.com/dev@kafka.apache.org/msg93781.html)
   Discussion 2. [Re: ConsumerGroupCommand tool 
improvement?](https://www.mail-archive.com/dev@kafka.apache.org/msg90561.html)
   
   *Unit tests implemented*
   
   ### Committer Checklist (excluded from commit message)
   - [v] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Multiple Consumer Group Management with Regex
> -
>
> Key: KAFKA-7817
> URL: https://issues.apache.org/jira/browse/KAFKA-7817
> Project: Kafka
>  Issue Type: New Feature
>  Components: tools
>Affects Versions: 2.1.0
>Reporter: Alex Dunayevsky
>Assignee: Alex Dunayevsky
>Priority: Minor
>
> //TODO:
> New feature: Provide ConsumerGroupCommand with ability to query/manage 
> multiple consumer groups using a single regex pattern. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets

2019-05-08 Thread Boquan Tang (JIRA)
Boquan Tang created KAFKA-8335:
--

 Summary: Log cleaner skips Transactional mark and batch record, 
causing unlimited growth of __consumer_offsets
 Key: KAFKA-8335
 URL: https://issues.apache.org/jira/browse/KAFKA-8335
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.2.0
Reporter: Boquan Tang


My Colleague Weichu already sent out a mail to kafka user mailing list 
regarding this issue, but we think it's worth having a ticket tracking it.

We are using Kafka Streams with exactly-once enabled on a Kafka cluster for
a while.

Recently we found that the size of __consumer_offsets partitions grew huge.
Some partition went over 30G. This caused Kafka to take quite long to load
"__consumer_offsets" topic on startup (it loads the topic in order to
become group coordinator).

We dumped the __consumer_offsets segments and found that while normal
offset commits are nicely compacted, transaction records (COMMIT, etc) are
all preserved. Looks like that since these messages don't have a key, the
LogCleaner is keeping them all:

--
$ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
/003484332061.log --key-decoder-class
kafka.serializer.StringDecoder 2>/dev/null | cat -v | head
Dumping 003484332061.log
Starting offset: 3484332061
offset: 3484332089 position: 549 CreateTime: 1556003706952 isvalid: true
keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 1006
producerEpoch: 2530 sequence: -1 isTransactional: true headerKeys: []
endTxnMarker: COMMIT coordinatorEpoch: 81
offset: 3484332090 position: 627 CreateTime: 1556003706952 isvalid: true
keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 4005
producerEpoch: 2520 sequence: -1 isTransactional: true headerKeys: []
endTxnMarker: COMMIT coordinatorEpoch: 84
...
--

Streams is doing transaction commits per 100ms (commit.interval.ms=100 when
exactly-once) so the __consumer_offsets is growing really fast.

Is this (to keep all transactions) by design, or is that a bug for
LogCleaner?  What would be the way to clean up the topic?





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8317) ClassCastException using KTable.suppress()

2019-05-08 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835391#comment-16835391
 ] 

Andrew commented on KAFKA-8317:
---

Ok thanks for the reply. I will try this later. What surprises me though, is 
why it would works fine when I don't use suppress.

> ClassCastException using KTable.suppress()
> --
>
> Key: KAFKA-8317
> URL: https://issues.apache.org/jira/browse/KAFKA-8317
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew
>Priority: Major
>
> I am trying to use `KTable.suppress()` and I am getting the following error :
> {Code}
> java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed 
> cannot be cast to java.lang.String
>     at 
> org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
>     at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:95)
>     at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:87)
>     at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:40)
>     at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
> {Code}
> My code is as follows :
> {Code}
>     final KTable, GenericRecord> groupTable = 
> groupedStream
>     .aggregate(lastAggregator, lastAggregator, materialized);
>     final KTable, GenericRecord> suppressedTable = 
> groupTable.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
>     // write the change-log stream to the topic
>     suppressedTable.toStream((k, v) -> k.key())
>     .mapValues(joinValueMapper::apply)
>     .to(props.joinTopic());
> {Code}
> The code without using `suppressedTable` works... what am i doing wrong.
> Someone else has encountered the same issue :
> https://gist.github.com/robie2011/1caa4772b60b5a6f993e6f98e792a380
> Slack conversation : 
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556633088239800



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8317) ClassCastException using KTable.suppress()

2019-05-08 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835390#comment-16835390
 ] 

Andrew commented on KAFKA-8317:
---

Ok thanks for the reply. I will try this later. What surprises me though, is 
why it would should work fine when I don't use suppress.

> ClassCastException using KTable.suppress()
> --
>
> Key: KAFKA-8317
> URL: https://issues.apache.org/jira/browse/KAFKA-8317
> Project: Kafka
>  Issue Type: Bug
>Reporter: Andrew
>Priority: Major
>
> I am trying to use `KTable.suppress()` and I am getting the following error :
> {Code}
> java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed 
> cannot be cast to java.lang.String
>     at 
> org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
>     at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:95)
>     at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:87)
>     at 
> org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:40)
>     at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:117)
> {Code}
> My code is as follows :
> {Code}
>     final KTable, GenericRecord> groupTable = 
> groupedStream
>     .aggregate(lastAggregator, lastAggregator, materialized);
>     final KTable, GenericRecord> suppressedTable = 
> groupTable.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
>     // write the change-log stream to the topic
>     suppressedTable.toStream((k, v) -> k.key())
>     .mapValues(joinValueMapper::apply)
>     .to(props.joinTopic());
> {Code}
> The code without using `suppressedTable` works... what am i doing wrong.
> Someone else has encountered the same issue :
> https://gist.github.com/robie2011/1caa4772b60b5a6f993e6f98e792a380
> Slack conversation : 
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556633088239800



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8106) Reducing the allocation and copying of ByteBuffer when logValidator do validation.

2019-05-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835389#comment-16835389
 ] 

ASF GitHub Bot commented on KAFKA-8106:
---

Flowermin commented on pull request #6699: KAFKA-8106:Reducing the allocation 
and copying of ByteBuffer when logValidator do validation(target trunk). 
URL: https://github.com/apache/kafka/pull/6699
 
 
   We suggest that reducing the allocation and copying of ByteBuffer when 
logValidator do validation when magic value to use is above 1 and no format 
conversion or value overwriting is required for compressed messages.And 
improved code is as follows.
   1. Adding a class **SimplifiedDefaultRecord** implement class Record which 
define  various attributes of a message. 
   2. Adding Function **simplifiedreadFrom**() at class **DefaultRecord** .This 
function will not read data from DataInput to  ByteBuffer which need newly 
creating .**This will reduce the allocation and copying of ByteBuffer** when 
logValidator do validation .This will reduces GC frequency. We offer a simple 
read function to read data from **DataInput** whithout create ByteBuffer.Of 
course this opertion can not avoid deconmpression to data.
   3. Adding Function **simplifiedIterator**() and 
**simplifiedCompressedIterator**() at class **DefaultRecordBatch**.This two 
functions will return iterator of instance belongs to class 
**SimplifiedDefaultRecord**.
   4. Modify code of function **validateMessagesAndAssignOffsetsCompressed**() 
at class  LogValidator.
   
   **After modifing code wich  reducing the allocation and copying of 
ByteBuffer**, the test performance is greatly improved, and the CPU's stable 
usage is below 60%. The following is a comparison of different code test 
performance under the same conditions.
   **Result of performance testing**
   Main config of Kafka: Single 
Message:1024B;TopicPartitions:200;linger.ms:1000ms,
   **1.Before modified code(Source code):**
   Network inflow rate:600M/s;CPU(%)(97%);production:25,000,000 messages/s
   **2.After modified code(remove allocation of ByteBuffer):**
   Network inflow rate:1G/s;CPU(%)(<60%);production:41,000,000 messages/s
**1.Before modified code(Source code) GC:**
   ![](https://i.loli.net/2019/05/07/5cd16df163ad3.png)
   **2.After modified code(remove allocation of ByteBuffer) GC:**
   ![](https://i.loli.net/2019/05/07/5cd16dae1dbc2.png)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reducing the allocation and copying of ByteBuffer  when logValidator  do 
> validation.
> 
>
> Key: KAFKA-8106
> URL: https://issues.apache.org/jira/browse/KAFKA-8106
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0, 2.1.1
> Environment: Server : 
> cpu:2*16 ; 
> MemTotal : 256G;
> Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network 
> Connection ; 
> SSD.
>Reporter: Flower.min
>Assignee: Flower.min
>Priority: Major
>  Labels: performance
>
>       We do performance testing about Kafka in specific scenarios as 
> described below .We build a kafka cluster with one broker,and create topics 
> with different number of partitions.Then we start lots of producer processes 
> to send large amounts of messages to one of the topics at one  testing .
> *_Specific Scenario_*
>   
>  *_1.Main config of Kafka_*  
>  # Main config of Kafka  
> server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/]
>  # Number of TopicPartition : 50~2000
>  # Size of Single Message : 1024B
>  
>  *_2.Config of KafkaProducer_* 
> ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory||
> |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB|
> *_3.The best result of performance testing_*  
> ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of 
> production||
> |550MB/s~610MB/s|97%~99%|550MB/s~610MB/s       |23,000,000 messages/s|
> *_4.Phenomenon and  my doubt_*
>     _The upper limit of CPU usage has been reached  But  it does not 
> reach the upper limit of the bandwidth of the server  network. *We are 
> doubtful about which  cost too much CPU time and we want to Improve  
> performance and reduces CPU usage of Kafka server.*_
> _*5.Analysis*_
>         We analysis the JFIR of Kafka server when doing performance testing 
> .After we checked and completed the performance test again, we located the 
> code 

[jira] [Assigned] (KAFKA-8206) A consumer can't discover new group coordinator when the cluster was partly restarted

2019-05-08 Thread Ivan Yurchenko (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Yurchenko reassigned KAFKA-8206:
-

Assignee: Ivan Yurchenko

> A consumer can't discover new group coordinator when the cluster was partly 
> restarted
> -
>
> Key: KAFKA-8206
> URL: https://issues.apache.org/jira/browse/KAFKA-8206
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0, 2.0.0, 2.2.0
>Reporter: alex gabriel
>Assignee: Ivan Yurchenko
>Priority: Critical
>
> *A consumer can't discover new group coordinator when the cluster was partly 
> restarted*
> Preconditions:
> I use Kafka server and Java kafka-client lib 2.2 version
> I have 2 Kafka nodes running localy (localhost:9092, localhost:9093) and 1 
> ZK(localhost:2181)
> I have replication factor 2 for the all my topics and 
> '_unclean.leader.election.enable=true_' on both Kafka nodes.
> Steps to reproduce:
> 1) Start 2nodes (localhost:9092/localhost:9093)
> 2) Start consumer with 'bootstrap.servers=localhost:9092,localhost:9093'
> {noformat}
> // discovered group coordinator (0-node)
> 2019-04-09 16:23:18,963 INFO 
> [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess]
>  - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
> Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)>
> ...metadatacache is updated (2 nodes in the cluster list)
> 2019-04-09 16:23:18,928 DEBUG 
> [org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate] - 
> [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Sending 
> metadata request (type=MetadataRequest, topics=) to node localhost:9092 
> (id: -1 rack: null)>
> 2019-04-09 16:23:18,940 DEBUG [org.apache.kafka.clients.Metadata.update] - 
> Updated cluster metadata version 2 to MetadataCache{cluster=Cluster(id = 
> P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null), 
> localhost:9093 (id: 1 rack: null)], partitions = [], controller = 
> localhost:9092 (id: 0 rack: null))}>
> {noformat}
> 3) Shutdown 1-node (localhost:9093)
> {noformat}
> // metadata was updated to the 4 version (but for some reasons it still had 2 
> alive nodes inside cluster)
> 2019-04-09 16:23:46,981 DEBUG [org.apache.kafka.clients.Metadata.update] - 
> Updated cluster metadata version 4 to MetadataCache{cluster=Cluster(id = 
> P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9093 (id: 1 rack: null), 
> localhost:9092 (id: 0 rack: null)], partitions = [Partition(topic = 
> events-sorted, partition = 1, leader = 0, replicas = [0,1], isr = [0,1], 
> offlineReplicas = []), Partition(topic = events-sorted, partition = 0, leader 
> = 0, replicas = [0,1], isr = [0,1], offlineReplicas = [])], controller = 
> localhost:9092 (id: 0 rack: null))}>
> //consumers thinks that node-1 is still alive and try to send coordinator 
> lookup to it but failed
> 2019-04-09 16:23:46,981 INFO 
> [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess]
>  - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
> Discovered group coordinator localhost:9093 (id: 2147483646 rack: null)>
> 2019-04-09 16:23:46,981 INFO 
> [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.markCoordinatorUnknown]
>  - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Group 
> coordinator localhost:9093 (id: 2147483646 rack: null) is unavailable or 
> invalid, will attempt rediscovery>
> 2019-04-09 16:24:01,117 DEBUG 
> [org.apache.kafka.clients.NetworkClient.handleDisconnections] - [Consumer 
> clientId=events-consumer0, groupId=events-group-gabriel] Node 1 disconnected.>
> 2019-04-09 16:24:01,117 WARN 
> [org.apache.kafka.clients.NetworkClient.processDisconnection] - [Consumer 
> clientId=events-consumer0, groupId=events-group-gabriel] Connection to node 1 
> (localhost:9093) could not be established. Broker may not be available.>
> // refreshing metadata again
> 2019-04-09 16:24:01,117 DEBUG 
> [org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion]
>  - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
> Cancelled request with header RequestHeader(apiKey=FIND_COORDINATOR, 
> apiVersion=2, clientId=events-consumer0, correlationId=112) due to node 1 
> being disconnected>
> 2019-04-09 16:24:01,117 DEBUG 
> [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady]
>  - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] 
> Coordinator discovery failed, refreshing metadata>
> // metadata was updated to the 5 version where cluster had only 0-node 
> localhost:9092 as expected.
> 2019-04-09 

[jira] [Commented] (KAFKA-8247) Duplicate error handling in kafka-server-start.sh and actual Kafka class

2019-05-08 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-8247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16835344#comment-16835344
 ] 

Sönke Liebau commented on KAFKA-8247:
-

So..these checks actually have a little bit of sense, because the error message 
that is shown here is the only place where the -daemon parameter is documented 
that can be used to send tasks to the background.
Since this parameter is handled in kafka-run-class it is honored by all command 
line tools though, regardless of whether this makes sense or not, so we might 
just add it in the default options next to --help and --version. Downside to 
this is that this will make the inconsistency between --help/version and 
-daemon glaringly obvious, not much we can do about that.

I'm afraid there is no perfect solution here, handling parameters in two 
different places is just not possible in a clean way. I'll create a pull 
request to remove these duplicate checks for now, as it shouldn't break 
anything and fixes the immediate issue.

> Duplicate error handling in kafka-server-start.sh and actual Kafka class
> 
>
> Key: KAFKA-8247
> URL: https://issues.apache.org/jira/browse/KAFKA-8247
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: Sönke Liebau
>Assignee: Sönke Liebau
>Priority: Minor
>
> There is some duplication of error handling for command line parameters that 
> are passed into kafka-server-start.sh
>  
> The shell script prints an error, if no arguments are passed in, effectively 
> causing the same check in 
> [Kafka|https://github.com/apache/kafka/blob/92db08cba582668d77160b0c2853efd45a1b809b/core/src/main/scala/kafka/Kafka.scala#L43]
>  to never be triggered, unless the only option that is specified is -daemon, 
> which would be removed before passing arguments to the java class.
>  
> While not in any way critical I don't think that this is intended behavior. I 
> think we should remove the extra check in kafka-server-start.sh and leave 
> argument handling up to the Kafka class.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)