[jira] [Commented] (KAFKA-6172) Cache lastEntry in TimeIndex to avoid unnecessary disk access

2018-03-06 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388714#comment-16388714
 ] 

Ivan Babrou commented on KAFKA-6172:


Github doesn't report this as included in 1.0.1:

* 
https://github.com/apache/kafka/commit/0c895706e8ab511efe352a824a0c9e2dab62499e

> Cache lastEntry in TimeIndex to avoid unnecessary disk access
> -
>
> Key: KAFKA-6172
> URL: https://issues.apache.org/jira/browse/KAFKA-6172
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
> Fix For: 1.0.1
>
>
> LogSegment.close() calls timeIndex.maybeAppend(...), which in turns make a 
> number of calls to timeIndex.lastEntry(). Currently timeIndex.lastEntry() 
> involves disk seek operation because it tries to read the content of the last 
> few bytes of the index files on the disk. This slows down the broker shutdown 
> process.
> Here is the time of LogManager.shutdown() in various settings. In all these 
> tests, broker has roughly 6k partitions and 20k segments.
> - If broker does not have this patch and `log.dirs` is configured with 1 JBOD 
> log directory, LogManager.shutdown() takes 15 minutes (roughly 900 seconds).
> - If broker does not have this patch and `log.dirs` is configured with 10 
> JBOD log directories, LogManager.shutdown() takes 84 seconds.
> - If broker have this patch and `log.dirs` is configured with 10 JBOD log 
> directories, LogManager.shutdown() takes 24 seconds.
> Thus we expect to save 71% time in LogManager.shutdown() by having this 
> optimization. This patch intends to reduce the broker shutdown time by 
> caching the lastEntry in memory so that broker does not have to always read 
> disk to get the lastEntry.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6175) AbstractIndex should cache index file to avoid unnecessary disk access during resize()

2018-03-06 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388713#comment-16388713
 ] 

Ivan Babrou commented on KAFKA-6175:


Github doesn't report this as included in 1.0.1:

* 
https://github.com/apache/kafka/commit/12af521c487a146456442f895b9fc99a45ed100f

> AbstractIndex should cache index file to avoid unnecessary disk access during 
> resize()
> --
>
> Key: KAFKA-6175
> URL: https://issues.apache.org/jira/browse/KAFKA-6175
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
> Fix For: 1.0.1
>
>
> Currently when we shutdown a broker, we will call AbstractIndex.resize() for 
> all segments on the broker, regardless of whether the log segment is active 
> or not. AbstractIndex.resize() incurs raf.setLength(), which is expensive 
> because it accesses disks. If we do a threaddump during either 
> LogManger.shutdown() or LogManager.loadLogs(), most threads are in RUNNABLE 
> state at java.io.RandomAccessFile.setLength().
> This patch intends to speed up broker startup and shutdown time by skipping 
> AbstractIndex.resize() for inactive log segments.
> Here is the time of LogManager.shutdown() in various settings. In all these 
> tests, broker has roughly 6k partitions and 19k segments.
> - If broker does not have this patch and KAFKA-6172, LogManager.shutdown() 
> takes 69 seconds
> - If broker has KAFKA-6172 but not this patch, LogManager.shutdown() takes 21 
> seconds.
> - If broker has KAFKA-6172 and this patch, LogManager.shutdown() takes 1.6 
> seconds.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6423) Slow shutdown with many open files

2018-03-06 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6423?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16388711#comment-16388711
 ] 

Ivan Babrou commented on KAFKA-6423:


[~doli], Is this solved by KAFKA-6175 and KAFKA-6172?

> Slow shutdown with many open files
> --
>
> Key: KAFKA-6423
> URL: https://issues.apache.org/jira/browse/KAFKA-6423
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.1
>Reporter: Ivan Babrou
>Priority: Major
>
> We have brokers with 20k open files and shutdown is extremely slow, 
> progressing usually at around 60 closed file descriptors per second:
> {noformat}
> $ while true; do echo $(date) $(sudo ls /proc/6363/fd | wc -l); sleep 1; done
> Thu Jan 4 23:00:51 UTC 2018 9770
> Thu Jan 4 23:00:52 UTC 2018 9691
> Thu Jan 4 23:00:53 UTC 2018 9616
> Thu Jan 4 23:00:54 UTC 2018 9561
> Thu Jan 4 23:00:55 UTC 2018 9509
> Thu Jan 4 23:00:56 UTC 2018 9427
> Thu Jan 4 23:00:57 UTC 2018 9350
> Thu Jan 4 23:00:58 UTC 2018 9260
> Thu Jan 4 23:00:59 UTC 2018 9208
> {noformat}
> If you strace the process, you can see:
> {noformat}
> $ sudo strace -f -c -p 6363
> strace: Process 6363 attached with 97 threads
> ^Cstrace: Process 6363 detached
> strace: Process 6604 detached
> strace: Process 6605 detached
> strace: Process 6606 detached
> strace: Process 6607 detached
> strace: Process 6608 detached
> strace: Process 6609 detached
> strace: Process 6610 detached
> strace: Process 6611 detached
> strace: Process 6612 detached
> strace: Process 6613 detached
> strace: Process 6614 detached
> strace: Process 6615 detached
> strace: Process 6616 detached
> strace: Process 6617 detached
> strace: Process 6618 detached
> strace: Process 6619 detached
> strace: Process 6620 detached
> strace: Process 6621 detached
> strace: Process 6622 detached
> strace: Process 6623 detached
> strace: Process 6624 detached
> strace: Process 6625 detached
> strace: Process 6626 detached
> strace: Process 6627 detached
> strace: Process 6628 detached
> strace: Process 6629 detached
> strace: Process 6630 detached
> strace: Process 6631 detached
> strace: Process 6632 detached
> strace: Process 6633 detached
> strace: Process 6634 detached
> strace: Process 6635 detached
> strace: Process 6636 detached
> strace: Process 6637 detached
> strace: Process 6638 detached
> strace: Process 6639 detached
> strace: Process 6640 detached
> strace: Process 6641 detached
> strace: Process 6642 detached
> strace: Process 6643 detached
> strace: Process 6644 detached
> strace: Process 6645 detached
> strace: Process 6646 detached
> strace: Process 6647 detached
> strace: Process 6648 detached
> strace: Process 6649 detached
> strace: Process 6650 detached
> strace: Process 6651 detached
> strace: Process 6652 detached
> strace: Process 6653 detached
> strace: Process 6654 detached
> strace: Process 6655 detached
> strace: Process 6656 detached
> strace: Process 6657 detached
> strace: Process 6658 detached
> strace: Process 6659 detached
> strace: Process 6660 detached
> strace: Process 6661 detached
> strace: Process 6662 detached
> strace: Process 6663 detached
> strace: Process 6716 detached
> strace: Process 6717 detached
> strace: Process 6718 detached
> strace: Process 6719 detached
> strace: Process 6720 detached
> strace: Process 6721 detached
> strace: Process 6722 detached
> strace: Process 6723 detached
> strace: Process 6724 detached
> strace: Process 6725 detached
> strace: Process 6726 detached
> strace: Process 6727 detached
> strace: Process 6728 detached
> strace: Process 6729 detached
> strace: Process 6730 detached
> strace: Process 6731 detached
> strace: Process 6732 detached
> strace: Process 6733 detached
> strace: Process 6734 detached
> strace: Process 6735 detached
> strace: Process 6736 detached
> strace: Process 6737 detached
> strace: Process 6738 detached
> strace: Process 6739 detached
> strace: Process 6740 detached
> strace: Process 6741 detached
> strace: Process 6760 detached
> strace: Process 6779 detached
> strace: Process 6781 detached
> strace: Process 6783 detached
> strace: Process 6892 detached
> strace: Process 2339 detached
> strace: Process 2340 detached
> strace: Process 5122 detached
> strace: Process 5123 detached
> strace: Process 5652 detached
> % time seconds  usecs/call callserrors syscall
> -- --- --- - - 
>  65.190.859302   358042419 restart_syscall
>  26.600.350656 507   692   190 futex
>   5.170.068142227130   epoll_wait
>   1.220.016141  56   287   ftruncate
>   0.660.008679  20   432   close
>   0.380.005054  35   144   fsync
>   0.260.003489  12   288   open
>   0.19   

[jira] [Commented] (KAFKA-6238) Issues with protocol version when applying a rolling upgrade to 1.0.0

2018-02-16 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6238?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16367886#comment-16367886
 ] 

Ivan Babrou commented on KAFKA-6238:


Thanks, that's precisely what we've done.

> Issues with protocol version when applying a rolling upgrade to 1.0.0
> -
>
> Key: KAFKA-6238
> URL: https://issues.apache.org/jira/browse/KAFKA-6238
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 1.0.0
>Reporter: Diego Louzán
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 1.0.1
>
>
> Hello,
> I am trying to perform a rolling upgrade from 0.10.0.1 to 1.0.0, and 
> according to the instructions in the documentation, I should only have to 
> upgrade the "inter.broker.protocol.version" parameter in the first step. But 
> after setting the value to "0.10.0" or "0.10.0.1" (tried both), the broker 
> refuses to start with the following error:
> {code}
> [2017-11-20 08:28:46,620] FATAL  (kafka.Kafka$)
> java.lang.IllegalArgumentException: requirement failed: 
> log.message.format.version 1.0-IV0 cannot be used when 
> inter.broker.protocol.version is set to 0.10.0.1
> at scala.Predef$.require(Predef.scala:224)
> at kafka.server.KafkaConfig.validateValues(KafkaConfig.scala:1205)
> at kafka.server.KafkaConfig.(KafkaConfig.scala:1170)
> at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:881)
> at kafka.server.KafkaConfig$.fromProps(KafkaConfig.scala:878)
> at 
> kafka.server.KafkaServerStartable$.fromProps(KafkaServerStartable.scala:28)
> at kafka.Kafka$.main(Kafka.scala:82)
> at kafka.Kafka.main(Kafka.scala)
> {code}
> I checked the instructions for rolling upgrades to previous versions (namely 
> 0.11.0.0), and in here it's stated that is also needed to upgrade the 
> "log.message.format.version" parameter in two stages. I have tried that and 
> the upgrade worked. It seems it still applies to version 1.0.0, so I'm not 
> sure if this is wrong documentation, or an actual issue with kafka since it 
> should work as stated in the docs.
> Regards,
> Diego Louzán



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6465) Add a metrics for the number of records per log

2018-01-21 Thread Ivan Babrou (JIRA)
Ivan Babrou created KAFKA-6465:
--

 Summary: Add a metrics for the number of records per log
 Key: KAFKA-6465
 URL: https://issues.apache.org/jira/browse/KAFKA-6465
 Project: Kafka
  Issue Type: Bug
Reporter: Ivan Babrou


Currently there are log metrics for:
 * Start offset
 * End offset
 * Size in bytes
 * Number of segments

I propose to add another metric to track number of record batches in the log. 
This should provide operators with an idea of how much batching is happening on 
the producers. Having this metric in one place seems easier than scraping the 
metric from each producer.

Having an absolute counter may be infeasible (batches are not assigned 
sequential IDs), but gauge should be ok. Average batch size can be calculated 
as (end offset - start offset) / number of batches. This will be heavily skewed 
for logs with long retention, though.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6444) Kafka consumers and producers get confused by dualstack A + AAAA DNS records

2018-01-13 Thread Ivan Babrou (JIRA)
Ivan Babrou created KAFKA-6444:
--

 Summary: Kafka consumers and producers get confused by dualstack A 
+  DNS records
 Key: KAFKA-6444
 URL: https://issues.apache.org/jira/browse/KAFKA-6444
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.11.0.1
Reporter: Ivan Babrou


We have hostnames with both A (IPv4) and  (IPv6) DNS records. Kafka is 
configured to only listen on IPv6 by manually setting IP to listen on and 
advertise outside.

Brokers have no issue communicating between them, because they are not given 
the option to resolve hostnames and pick IP protocol version.

Consumers and producers have to use bootstrap hostnames and do not try to 
connect to IPv6 at all, they are stuck in SYN_SENT over IPv4:

{noformat}
syslog-ng 12621  999  123u  IPv6 2411122889  0t0TCP 
192.168.0.2:41626->192.168.0.1:9092 (SYN_SENT)
{noformat}

This happened to consumer in syslog-ng output plugin:

* https://github.com/balabit/syslog-ng/issues/1835

It also happened to a Flink consumer, although I do no have any more info about 
that one. We fixed the issue by only providing  records for bootstrapping.

Previously we saw the opposite problem with dualstack: software does not 
implement happy eyeballs and only connects to IPv6 address, which is 
firewalled. This happened to SSH (client gets stuck if you don't supply -4) and 
Go (https://github.com/golang/go/issues/5) to give a couple of examples.

The solution for this is Happy Eyeballs: 
https://en.wikipedia.org/wiki/Happy_Eyeballs

Kafka clients should connect to IPv6 first and then fallback to IPv4 if not 
available.

There is also KAFKA-3347.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3347) Configure java to prefer ipv4

2018-01-13 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3347?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16325293#comment-16325293
 ] 

Ivan Babrou commented on KAFKA-3347:


Can this be closed as wontfix? Calendar says it's 2018 already.

> Configure java to prefer ipv4
> -
>
> Key: KAFKA-3347
> URL: https://issues.apache.org/jira/browse/KAFKA-3347
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Jeremy Custenborder
>Assignee: Jeremy Custenborder
>Priority: Minor
>
> I've noticed that ports are sometimes binding on IPv6 addresses rather than 
> the IPv4 address I'm expecting. Can we change this so we bing on the IPv4 
> address rather than the IPv6 address? I'm proposing to add this to 
> KAFKA_JVM_PERFORMANCE_OPTS.
> {code}
> -Djava.net.preferIPv4Stack=true
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6441) FetchRequest populates buffer of size MinBytes, even if response is smaller

2018-01-12 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324857#comment-16324857
 ] 

Ivan Babrou commented on KAFKA-6441:


Looks like the issue is in Sarama, which only reads one record batch:

* https://github.com/Shopify/sarama/issues/1022

> FetchRequest populates buffer of size MinBytes, even if response is smaller
> ---
>
> Key: KAFKA-6441
> URL: https://issues.apache.org/jira/browse/KAFKA-6441
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.1
>Reporter: Ivan Babrou
>
> We're using Sarama Go client as consumer, but I don't think it's relevant. 
> Producer is syslog-ng with Kafka output, I'm not quite sure which log format 
> Kafka itself is using, but I can assume 0.11.0.0, because that's what is set 
> in topic settings.
> Our FetchRequest has minSize = 16MB, maxSize = 64, maxWait = 500ms. For a 
> silly reason, Kafka decides to reply with at least minSize buffer with just 
> one 1KB log message. When Sarama was using older consumer API, everything was 
> okay. When we upgraded to 0.11.0.0 consumer API, consumer traffic for 
> 125Mbit/s topic spiked to 55000Mbit/s on the wire and consumer wasn't even 
> able to keep up.
> 1KB message in a 16MB buffer is 1,600,000% overhead.
> I don't think there's any valid reason to do this.
> It's also mildly annoying that there is no tag 0.11.0.1 in git, looking at 
> changes is harder than it should be.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Resolved] (KAFKA-6441) FetchRequest populates buffer of size MinBytes, even if response is smaller

2018-01-12 Thread Ivan Babrou (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Babrou resolved KAFKA-6441.

Resolution: Invalid

> FetchRequest populates buffer of size MinBytes, even if response is smaller
> ---
>
> Key: KAFKA-6441
> URL: https://issues.apache.org/jira/browse/KAFKA-6441
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.1
>Reporter: Ivan Babrou
>
> We're using Sarama Go client as consumer, but I don't think it's relevant. 
> Producer is syslog-ng with Kafka output, I'm not quite sure which log format 
> Kafka itself is using, but I can assume 0.11.0.0, because that's what is set 
> in topic settings.
> Our FetchRequest has minSize = 16MB, maxSize = 64, maxWait = 500ms. For a 
> silly reason, Kafka decides to reply with at least minSize buffer with just 
> one 1KB log message. When Sarama was using older consumer API, everything was 
> okay. When we upgraded to 0.11.0.0 consumer API, consumer traffic for 
> 125Mbit/s topic spiked to 55000Mbit/s on the wire and consumer wasn't even 
> able to keep up.
> 1KB message in a 16MB buffer is 1,600,000% overhead.
> I don't think there's any valid reason to do this.
> It's also mildly annoying that there is no tag 0.11.0.1 in git, looking at 
> changes is harder than it should be.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6441) FetchRequest populates buffer of size MinBytes, even if response is smaller

2018-01-11 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323603#comment-16323603
 ] 

Ivan Babrou commented on KAFKA-6441:


I dumped raw bytes from Kafka responses and it seems like buffers are fully 
populated with messages. Digging deeper to find out what's causing Sarama to 
only read the first message.

> FetchRequest populates buffer of size MinBytes, even if response is smaller
> ---
>
> Key: KAFKA-6441
> URL: https://issues.apache.org/jira/browse/KAFKA-6441
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.1
>Reporter: Ivan Babrou
>
> We're using Sarama Go client as consumer, but I don't think it's relevant. 
> Producer is syslog-ng with Kafka output, I'm not quite sure which log format 
> Kafka itself is using, but I can assume 0.11.0.0, because that's what is set 
> in topic settings.
> Our FetchRequest has minSize = 16MB, maxSize = 64, maxWait = 500ms. For a 
> silly reason, Kafka decides to reply with at least minSize buffer with just 
> one 1KB log message. When Sarama was using older consumer API, everything was 
> okay. When we upgraded to 0.11.0.0 consumer API, consumer traffic for 
> 125Mbit/s topic spiked to 55000Mbit/s on the wire and consumer wasn't even 
> able to keep up.
> 1KB message in a 16MB buffer is 1,600,000% overhead.
> I don't think there's any valid reason to do this.
> It's also mildly annoying that there is no tag 0.11.0.1 in git, looking at 
> changes is harder than it should be.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6441) FetchRequest populates buffer of size MinBytes, even if response is smaller

2018-01-11 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323073#comment-16323073
 ] 

Ivan Babrou commented on KAFKA-6441:


I think it's a bit different, buffers for each partition are allocated based on 
maxBytes:

{noformat}
2018/01/11 21:48:58 Request: max wait time = 500, min bytes = 1, max bytes = 
104857600, isolation = 0, num blocks = 1
2018/01/11 21:48:58   fetch request block for partition 0: 
{fetchOffset:7075063209, maxBytes:2097152}
2018/01/11 21:48:58 Request: max wait time = 500, min bytes = 1, max bytes = 
104857600, isolation = 0, num blocks = 1
2018/01/11 21:48:58   fetch request block for partition 0: 
{fetchOffset:7075063209, maxBytes:2097152}
{noformat}

Here fetchRequestBlock translates to roughly to  PartitionData(offset, 
logStartOffset, maxBytes)

if I dump individual segments from the log, I see individual messages:

{noformat}
baseOffset: 15165279076 lastOffset: 15165279076 baseSequence: -1 lastSequence: 
-1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 140 isTransactional: 
false position: 9241092 CreateTime: 1515699408944 isvalid: true size: 910 
magic: 2 compresscodec: NONE crc: 456596511
baseOffset: 15165279077 lastOffset: 15165279077 baseSequence: -1 lastSequence: 
-1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 140 isTransactional: 
false position: 9242002 CreateTime: 1515699408955 isvalid: true size: 910 
magic: 2 compresscodec: NONE crc: 465015653
baseOffset: 15165279078 lastOffset: 15165279078 baseSequence: -1 lastSequence: 
-1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 140 isTransactional: 
false position: 9242912 CreateTime: 1515699408960 isvalid: true size: 908 
magic: 2 compresscodec: NONE crc: 1569816164
baseOffset: 15165279079 lastOffset: 15165279079 baseSequence: -1 lastSequence: 
-1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 140 isTransactional: 
false position: 9243820 CreateTime: 1515699408997 isvalid: true size: 915 
magic: 2 compresscodec: NONE crc: 1894915965
baseOffset: 15165279080 lastOffset: 15165279080 baseSequence: -1 lastSequence: 
-1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 140 isTransactional: 
false position: 9244735 CreateTime: 1515699409010 isvalid: true size: 916 
magic: 2 compresscodec: NONE crc: 2124364233
baseOffset: 15165279081 lastOffset: 15165279081 baseSequence: -1 lastSequence: 
-1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 140 isTransactional: 
false position: 9245651 CreateTime: 1515699409035 isvalid: true size: 918 
magic: 2 compresscodec: NONE crc: 1889246530
baseOffset: 15165279082 lastOffset: 15165279082 baseSequence: -1 lastSequence: 
-1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 140 isTransactional: 
false position: 9246569 CreateTime: 1515699409038 isvalid: true size: 914 
magic: 2 compresscodec: NONE crc: 877751927
baseOffset: 15165279083 lastOffset: 15165279083 baseSequence: -1 lastSequence: 
-1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 140 isTransactional: 
false position: 9247483 CreateTime: 1515699409061 isvalid: true size: 915 
magic: 2 compresscodec: NONE crc: 3313577153
baseOffset: 15165279084 lastOffset: 15165279084 baseSequence: -1 lastSequence: 
-1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 140 isTransactional: 
false position: 9248398 CreateTime: 1515699409132 isvalid: true size: 912 
magic: 2 compresscodec: NONE crc: 1951840175
baseOffset: 15165279085 lastOffset: 15165279085 baseSequence: -1 lastSequence: 
-1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 140 isTransactional: 
false position: 9249310 CreateTime: 1515699409133 isvalid: true size: 915 
magic: 2 compresscodec: NONE crc: 1357735233
baseOffset: 15165279086 lastOffset: 15165279086 baseSequence: -1 lastSequence: 
-1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 140 isTransactional: 
false position: 9250225 CreateTime: 1515699409137 isvalid: true size: 920 
magic: 2 compresscodec: NONE crc: 899719626
baseOffset: 15165279087 lastOffset: 15165279087 baseSequence: -1 lastSequence: 
-1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 140 isTransactional: 
false position: 9251145 CreateTime: 1515699409162 isvalid: true size: 915 
magic: 2 compresscodec: NONE crc: 1993963751
{noformat}

These should be combined when returned to consumer if buffer is large enough, 
but they are not for some reason.

> FetchRequest populates buffer of size MinBytes, even if response is smaller
> ---
>
> Key: KAFKA-6441
> URL: https://issues.apache.org/jira/browse/KAFKA-6441
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.1
>Reporter: Ivan Babrou
>
> We're using Sarama Go client as consumer, but I don't think it's relevant. 
> Producer is syslog-ng with Kafka output, I'm not quite sure which log 

[jira] [Issue Comment Deleted] (KAFKA-6441) FetchRequest populates buffer of size MinBytes, even if response is smaller

2018-01-10 Thread Ivan Babrou (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Babrou updated KAFKA-6441:
---
Comment: was deleted

(was: With 0.10.2.0 consumer API Sarama is able to get multiple messages in one 
FetchResponse.

It doesn't seem right to get only one with 0.11.0.0 API.)

> FetchRequest populates buffer of size MinBytes, even if response is smaller
> ---
>
> Key: KAFKA-6441
> URL: https://issues.apache.org/jira/browse/KAFKA-6441
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.1
>Reporter: Ivan Babrou
>
> We're using Sarama Go client as consumer, but I don't think it's relevant. 
> Producer is syslog-ng with Kafka output, I'm not quite sure which log format 
> Kafka itself is using, but I can assume 0.11.0.0, because that's what is set 
> in topic settings.
> Our FetchRequest has minSize = 16MB, maxSize = 64, maxWait = 500ms. For a 
> silly reason, Kafka decides to reply with at least minSize buffer with just 
> one 1KB log message. When Sarama was using older consumer API, everything was 
> okay. When we upgraded to 0.11.0.0 consumer API, consumer traffic for 
> 125Mbit/s topic spiked to 55000Mbit/s on the wire and consumer wasn't even 
> able to keep up.
> 1KB message in a 16MB buffer is 1,600,000% overhead.
> I don't think there's any valid reason to do this.
> It's also mildly annoying that there is no tag 0.11.0.1 in git, looking at 
> changes is harder than it should be.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6441) FetchRequest populates buffer of size MinBytes, even if response is smaller

2018-01-10 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321715#comment-16321715
 ] 

Ivan Babrou commented on KAFKA-6441:


With 0.10.2.0 consumer API Sarama is able to get multiple messages in one 
FetchResponse.

It doesn't seem right to get only one with 0.11.0.0 API.

> FetchRequest populates buffer of size MinBytes, even if response is smaller
> ---
>
> Key: KAFKA-6441
> URL: https://issues.apache.org/jira/browse/KAFKA-6441
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.1
>Reporter: Ivan Babrou
>
> We're using Sarama Go client as consumer, but I don't think it's relevant. 
> Producer is syslog-ng with Kafka output, I'm not quite sure which log format 
> Kafka itself is using, but I can assume 0.11.0.0, because that's what is set 
> in topic settings.
> Our FetchRequest has minSize = 16MB, maxSize = 64, maxWait = 500ms. For a 
> silly reason, Kafka decides to reply with at least minSize buffer with just 
> one 1KB log message. When Sarama was using older consumer API, everything was 
> okay. When we upgraded to 0.11.0.0 consumer API, consumer traffic for 
> 125Mbit/s topic spiked to 55000Mbit/s on the wire and consumer wasn't even 
> able to keep up.
> 1KB message in a 16MB buffer is 1,600,000% overhead.
> I don't think there's any valid reason to do this.
> It's also mildly annoying that there is no tag 0.11.0.1 in git, looking at 
> changes is harder than it should be.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6441) FetchRequest populates buffer of size MinBytes, even if response is smaller

2018-01-10 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16321716#comment-16321716
 ] 

Ivan Babrou commented on KAFKA-6441:


With 0.10.2.0 consumer API Sarama is able to get multiple messages in one 
FetchResponse.

It doesn't seem right to get only one with 0.11.0.0 API.

> FetchRequest populates buffer of size MinBytes, even if response is smaller
> ---
>
> Key: KAFKA-6441
> URL: https://issues.apache.org/jira/browse/KAFKA-6441
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.1
>Reporter: Ivan Babrou
>
> We're using Sarama Go client as consumer, but I don't think it's relevant. 
> Producer is syslog-ng with Kafka output, I'm not quite sure which log format 
> Kafka itself is using, but I can assume 0.11.0.0, because that's what is set 
> in topic settings.
> Our FetchRequest has minSize = 16MB, maxSize = 64, maxWait = 500ms. For a 
> silly reason, Kafka decides to reply with at least minSize buffer with just 
> one 1KB log message. When Sarama was using older consumer API, everything was 
> okay. When we upgraded to 0.11.0.0 consumer API, consumer traffic for 
> 125Mbit/s topic spiked to 55000Mbit/s on the wire and consumer wasn't even 
> able to keep up.
> 1KB message in a 16MB buffer is 1,600,000% overhead.
> I don't think there's any valid reason to do this.
> It's also mildly annoying that there is no tag 0.11.0.1 in git, looking at 
> changes is harder than it should be.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6441) FetchRequest populates buffer of size MinBytes, even if response is smaller

2018-01-10 Thread Ivan Babrou (JIRA)
Ivan Babrou created KAFKA-6441:
--

 Summary: FetchRequest populates buffer of size MinBytes, even if 
response is smaller
 Key: KAFKA-6441
 URL: https://issues.apache.org/jira/browse/KAFKA-6441
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.11.0.1
Reporter: Ivan Babrou


We're using Sarama Go client as consumer, but I don't think it's relevant. 
Producer is syslog-ng with Kafka output, I'm not quite sure which log format 
Kafka itself is using, but I can assume 0.11.0.0, because that's what is set in 
topic settings.

Our FetchRequest has minSize = 16MB, maxSize = 64, maxWait = 500ms. For a silly 
reason, Kafka decides to reply with at least minSize buffer with just one 1KB 
log message. When Sarama was using older consumer API, everything was okay. 
When we upgraded to 0.11.0.0 consumer API, consumer traffic for 125Mbit/s topic 
spiked to 55000Mbit/s on the wire and consumer wasn't even able to keep up.

1KB message in a 16MB buffer is 1,600,000% overhead.

I don't think there's any valid reason to do this.

It's also mildly annoying that there is no tag 0.11.0.1 in git, looking at 
changes is harder than it should be.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6314) Add a tool to delete kafka based consumer offsets for a given group

2018-01-05 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16314204#comment-16314204
 ] 

Ivan Babrou commented on KAFKA-6314:


Is there a workaround that allows universal alerting for lagging consumers?

> Add a tool to delete kafka based consumer offsets for a given group
> ---
>
> Key: KAFKA-6314
> URL: https://issues.apache.org/jira/browse/KAFKA-6314
> Project: Kafka
>  Issue Type: New Feature
>  Components: consumer, core, tools
>Reporter: Tom Scott
>Priority: Minor
>
> Add a tool to delete kafka based consumer offsets for a given group similar 
> to the reset tool. It could look something like this:
> kafka-consumer-groups --bootstrap-server localhost:9092 --delete-offsets 
> --group somegroup
> The case for this is as follows:
> 1. Consumer group with id: group1 subscribes to topic1
> 2. The group is stopped 
> 3. The subscription changed to topic2 but the id is kept as group1
> Now the out output of kafka-consumer-groups --describe for the group will 
> show topic1 even though the group is not subscribed to that topic. This is 
> bad for monitoring as it will show lag on topic1.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (KAFKA-6414) Inverse replication for replicas that are far behind

2018-01-01 Thread Ivan Babrou (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Babrou updated KAFKA-6414:
---
Description: 
Let's suppose the following starting point:

* 1 topic
* 1 partition
* 1 reader
* 24h retention period
* leader outbound bandwidth is 3x of inbound bandwidth (1x replication + 1x 
reader + 1x slack = total outbound)

In this scenario, when replica fails and needs to be brought back from scratch, 
you can catch up at 2x inbound bandwidth (1x regular replication + 1x slack 
used).

2x catch-up speed means replica will be at the point where leader is now in 24h 
/ 2x = 12h. However, in 12h the oldest 12h of the topic will fall out of 
retention cliff and will be deleted. There's absolutely no use for this data, 
it will never be read from the replica in any scenario. And this not even 
including the fact that we still need to replicate 12h more of data that 
accumulated since the time we started.

My suggestion is to refill sufficiently out of sync replicas backwards from the 
tip: newest segments first, oldest segments last. Then we can stop when we hit 
retention cliff and replicate far less data. The lower the ratio of catch-up 
bandwidth to inbound bandwidth, the higher the returns would be. This will also 
set a hard cap on retention time: it will be no higher than retention period if 
catch-up speed if >1x (if it's less, you're forever out of ISR anyway).

What exactly "sufficiently out of sync" means in terms of lag is a topic for a 
debate. The default segment size is 1GiB, I'd say that being >1 full segments 
behind probably warrants this.

As of now, the solution for slow recovery appears to be to reduce retention to 
speed up recovery, which doesn't seem very friendly.

  was:
Let's suppose the following starting point:

* 1 topic
* 1 partition
* 1 reader
* 24h retention period
* leader outbound bandwidth is 3x of inbound bandwidth (1x replication + 1x 
reader + 1x slack = total outbound)

In this scenario, when replica fails and needs to be brought back from scratch, 
you can catch up at 2x inbound bandwidth (1x regular replication + 1x slack 
used).

2x catch-up speed means replica will be at the point where leader is now in 24h 
/ 2x = 12h. However, in 12h the oldest 12h of the topic will fall out of 
retention cliff and will be deleted. There's absolutely to use for this data, 
it will never be read from the replica in any scenario. And this not even 
including the fact that we still need to replicate 12h more of data that 
accumulated since the time we started.

My suggestion is to refill sufficiently out of sync replicas backwards from the 
tip: newest segments first, oldest segments last. Then we can stop when we hit 
retention cliff and replicate far less data. The lower the ratio of catch-up 
bandwidth to inbound bandwidth, the higher the returns would be. This will also 
set a hard cap on retention time: it will be no higher than retention period if 
catch-up speed if >1x (if it's less, you're forever out of ISR anyway).

What exactly "sufficiently out of sync" means in terms of lag is a topic for a 
debate. The default segment size is 1GiB, I'd say that being >1 full segments 
behind probably warrants this.

As of now, the solution for slow recovery appears to be to reduce retention to 
speed up recovery, which doesn't seem very friendly.


> Inverse replication for replicas that are far behind
> 
>
> Key: KAFKA-6414
> URL: https://issues.apache.org/jira/browse/KAFKA-6414
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Reporter: Ivan Babrou
>
> Let's suppose the following starting point:
> * 1 topic
> * 1 partition
> * 1 reader
> * 24h retention period
> * leader outbound bandwidth is 3x of inbound bandwidth (1x replication + 1x 
> reader + 1x slack = total outbound)
> In this scenario, when replica fails and needs to be brought back from 
> scratch, you can catch up at 2x inbound bandwidth (1x regular replication + 
> 1x slack used).
> 2x catch-up speed means replica will be at the point where leader is now in 
> 24h / 2x = 12h. However, in 12h the oldest 12h of the topic will fall out of 
> retention cliff and will be deleted. There's absolutely no use for this data, 
> it will never be read from the replica in any scenario. And this not even 
> including the fact that we still need to replicate 12h more of data that 
> accumulated since the time we started.
> My suggestion is to refill sufficiently out of sync replicas backwards from 
> the tip: newest segments first, oldest segments last. Then we can stop when 
> we hit retention cliff and replicate far less data. The lower the ratio of 
> catch-up bandwidth to inbound bandwidth, the higher the returns would be. 
> This will also set a hard cap on retention time: it will be no higher than 

[jira] [Commented] (KAFKA-6414) Inverse replication for replicas that are far behind

2018-01-01 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307608#comment-16307608
 ] 

Ivan Babrou commented on KAFKA-6414:


I think it's up to people who are going to execute this to hash out the 
details, I'm just a man with an idea.

> Inverse replication for replicas that are far behind
> 
>
> Key: KAFKA-6414
> URL: https://issues.apache.org/jira/browse/KAFKA-6414
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Reporter: Ivan Babrou
>
> Let's suppose the following starting point:
> * 1 topic
> * 1 partition
> * 1 reader
> * 24h retention period
> * leader outbound bandwidth is 3x of inbound bandwidth (1x replication + 1x 
> reader + 1x slack = total outbound)
> In this scenario, when replica fails and needs to be brought back from 
> scratch, you can catch up at 2x inbound bandwidth (1x regular replication + 
> 1x slack used).
> 2x catch-up speed means replica will be at the point where leader is now in 
> 24h / 2x = 12h. However, in 12h the oldest 12h of the topic will fall out of 
> retention cliff and will be deleted. There's absolutely to use for this data, 
> it will never be read from the replica in any scenario. And this not even 
> including the fact that we still need to replicate 12h more of data that 
> accumulated since the time we started.
> My suggestion is to refill sufficiently out of sync replicas backwards from 
> the tip: newest segments first, oldest segments last. Then we can stop when 
> we hit retention cliff and replicate far less data. The lower the ratio of 
> catch-up bandwidth to inbound bandwidth, the higher the returns would be. 
> This will also set a hard cap on retention time: it will be no higher than 
> retention period if catch-up speed if >1x (if it's less, you're forever out 
> of ISR anyway).
> What exactly "sufficiently out of sync" means in terms of lag is a topic for 
> a debate. The default segment size is 1GiB, I'd say that being >1 full 
> segments behind probably warrants this.
> As of now, the solution for slow recovery appears to be to reduce retention 
> to speed up recovery, which doesn't seem very friendly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6414) Inverse replication for replicas that are far behind

2018-01-01 Thread Ivan Babrou (JIRA)
Ivan Babrou created KAFKA-6414:
--

 Summary: Inverse replication for replicas that are far behind
 Key: KAFKA-6414
 URL: https://issues.apache.org/jira/browse/KAFKA-6414
 Project: Kafka
  Issue Type: Bug
  Components: replication
Reporter: Ivan Babrou


Let's suppose the following starting point:

* 1 topic
* 1 partition
* 1 reader
* 24h retention period
* leader outbound bandwidth is 3x of inbound bandwidth (1x replication + 1x 
reader + 1x slack = total outbound)

In this scenario, when replica fails and needs to be brought back from scratch, 
you can catch up at 2x inbound bandwidth (1x regular replication + 1x slack 
used).

2x catch-up speed means replica will be at the point where leader is now in 24h 
/ 2x = 12h. However, in 12h the oldest 12h of the topic will fall out of 
retention cliff and will be deleted. There's absolutely to use for this data, 
it will never be read from the replica in any scenario. And this not even 
including the fact that we still need to replicate 12h more of data that 
accumulated since the time we started.

My suggestion is to refill sufficiently out of sync replicas backwards from the 
tip: newest segments first, oldest segments last. Then we can stop when we hit 
retention cliff and replicate far less data. The lower the ratio of catch-up 
bandwidth to inbound bandwidth, the higher the returns would be. This will also 
set a hard cap on retention time: it will be no higher than retention period if 
catch-up speed if >1x (if it's less, you're forever out of ISR anyway).

What exactly "sufficiently out of sync" means in terms of lag is a topic for a 
debate. The default segment size is 1GiB, I'd say that being >1 full segments 
behind probably warrants this.

As of now, the solution for slow recovery appears to be to reduce retention to 
speed up recovery, which doesn't seem very friendly.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6013) Controller getting stuck

2017-10-10 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199121#comment-16199121
 ] 

Ivan Babrou commented on KAFKA-6013:


I forced preferred leader election and current controller switched 9 -> 7 -> 0.

> Controller getting stuck
> 
>
> Key: KAFKA-6013
> URL: https://issues.apache.org/jira/browse/KAFKA-6013
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0, 0.11.0.1
>Reporter: Ivan Babrou
>  Labels: reliability
>
> It looks like a new issue in 0.11.0.0 and 0.11.0.1 still has it.
> We upgraded one of the clusters from 0.11.0.0 to 0.11.0.1 by shutting down 28 
> machines at once (single rack). When nodes came up none of them progressed 
> after these log lines:
> {noformat}
> Oct 05 02:17:42 mybroker14 kafka[32940]: INFO Kafka version : 0.11.0.1 
> (org.apache.kafka.common.utils.AppInfoParser)
> Oct 05 02:17:42 mybroker14 kafka[32940]: INFO Kafka commitId : 
> c2a0d5f9b1f45bf5 (org.apache.kafka.common.utils.AppInfoParser)
> Oct 05 02:17:42 mybroker14 kafka[32940]: INFO [Kafka Server 10014], started 
> (kafka.server.KafkaServer)
> {noformat}
> There was no indication in controller node logs that it picked up rebooted 
> nodes. This happened multiple times during the upgrade: once per rack plus 
> some on top of that.
> Reboot took ~20m, all nodes in a single rack rebooted in parallel.
> The fix was to restart controller node, but that did not go cleanly too:
> {noformat}
> ivan@mybroker26:~$ sudo journalctl --since 01:00 -u kafka | fgrep 'Error 
> during controlled shutdown' -A1
> Oct 05 01:57:41 mybroker26 kafka[37409]: WARN [Kafka Server 10026], Error 
> during controlled shutdown, possibly because leader movement took longer than 
> the configured controller.socket.timeout.ms and/or request.timeout.ms: 
> Connection to 10026 was disconnected before the response was read 
> (kafka.server.KafkaServer)
> Oct 05 01:57:46 mybroker26 kafka[37409]: WARN [Kafka Server 10026], Retrying 
> controlled shutdown after the previous attempt failed... 
> (kafka.server.KafkaServer)
> --
> Oct 05 01:58:16 mybroker26 kafka[37409]: WARN [Kafka Server 10026], Error 
> during controlled shutdown, possibly because leader movement took longer than 
> the configured controller.socket.timeout.ms and/or request.timeout.ms: 
> Connection to 10026 was disconnected before the response was read 
> (kafka.server.KafkaServer)
> Oct 05 01:58:18 mybroker26 kafka[37409]: INFO Rolled new log segment for 
> 'requests-40' in 3 ms. (kafka.log.Log)
> --
> Oct 05 01:58:51 mybroker26 kafka[37409]: WARN [Kafka Server 10026], Error 
> during controlled shutdown, possibly because leader movement took longer than 
> the configured controller.socket.timeout.ms and/or request.timeout.ms: 
> Connection to 10026 was disconnected before the response was read 
> (kafka.server.KafkaServer)
> Oct 05 01:58:56 mybroker26 kafka[37409]: WARN [Kafka Server 10026], Retrying 
> controlled shutdown after the previous attempt failed... 
> (kafka.server.KafkaServer)
> {noformat}
> I'm unable to reproduce the issue by just restarting or even rebooting one 
> broker, controller picks it up:
> {noformat}
> Oct 05 03:18:18 mybroker83 kafka[37402]: INFO [Controller 10083]: Newly added 
> brokers: 10001, deleted brokers: , all live brokers: ...
> {noformat}
> KAFKA-5028 happened in 0.11.0.0, so it's likely related.
> cc [~ijuma]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-6013) Controller getting stuck

2017-10-10 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16199031#comment-16199031
 ] 

Ivan Babrou commented on KAFKA-6013:


I restarted one node again today and controller forgot to pick it up after 
restart. I had to restart controller nodes twice, because each new controller 
also managed to forget about previous controller node I restarted. On the third 
node everything finally came to order. All these machines were up for ~5d.

One observation that I don't understand: 
kafka.controller:type=KafkaController,name=ControllerState reports state 9 (ISR 
change) all the time. Shouldn't it be 0 (idle) most of the time?

Stuck controllers have this stack for controller thread:

{noformat}
Oct 10 17:05:17 mybroker70 kafka[37433]: "controller-event-thread" #77 prio=5 
os_prio=0 tid=0x7f5cda487800 nid=0x963f in Object.wait() 
[0x7f5aaeced000]
Oct 10 17:05:17 mybroker70 kafka[37433]:java.lang.Thread.State: WAITING (on 
object monitor)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
java.lang.Object.wait(Native Method)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
java.lang.Object.wait(Object.java:502)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1406)
Oct 10 17:05:17 mybroker70 kafka[37433]: - locked <0x0007b2e00540> 
(a org.apache.zookeeper.ClientCnxn$Packet)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1210)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
org.apache.zookeeper.ZooKeeper.getData(ZooKeeper.java:1241)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
org.I0Itec.zkclient.ZkConnection.readData(ZkConnection.java:125)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
org.I0Itec.zkclient.ZkClient$12.call(ZkClient.java:1104)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
org.I0Itec.zkclient.ZkClient$12.call(ZkClient.java:1100)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:991)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1100)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:1095)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
kafka.utils.ZkUtils.readDataMaybeNull(ZkUtils.scala:660)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
kafka.controller.KafkaController$IsrChangeNotification.getTopicAndPartition(KafkaController.scala:1329)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
kafka.controller.KafkaController$IsrChangeNotification.$anonfun$process$26(KafkaController.scala:1310)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
kafka.controller.KafkaController$IsrChangeNotification$$Lambda$1253/1422719045.apply(Unknown
 Source)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:241)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
scala.collection.TraversableLike$$Lambda$391/1306246648.apply(Unknown Source)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
scala.collection.Iterator.foreach(Iterator.scala:929)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
scala.collection.Iterator.foreach$(Iterator.scala:929)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1417)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
scala.collection.IterableLike.foreach(IterableLike.scala:71)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
scala.collection.IterableLike.foreach$(IterableLike.scala:70)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
scala.collection.AbstractIterable.foreach(Iterable.scala:54)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
scala.collection.TraversableLike.flatMap(TraversableLike.scala:241)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
scala.collection.TraversableLike.flatMap$(TraversableLike.scala:238)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
kafka.controller.KafkaController$IsrChangeNotification.process(KafkaController.scala:1310)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:50)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
kafka.controller.ControllerEventManager$ControllerEventThread$$Lambda$395/1856206530.apply$mcV$sp(Unknown
 Source)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
Oct 10 17:05:17 mybroker70 kafka[37433]: at 
kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
Oct 10 17:05:17 

[jira] [Commented] (KAFKA-3359) Parallel log-recovery of un-flushed segments on startup

2017-10-06 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16195247#comment-16195247
 ] 

Ivan Babrou commented on KAFKA-3359:


[~ijuma], good to know, I'll bump the setting for our cluster. Is there any 
reason to read all data from partition during recovery?

> Parallel log-recovery of un-flushed segments on startup
> ---
>
> Key: KAFKA-3359
> URL: https://issues.apache.org/jira/browse/KAFKA-3359
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Affects Versions: 0.8.2.2, 0.9.0.1
>Reporter: Vamsi Subhash Achanta
>Assignee: Jay Kreps
>
> On startup, currently the log segments within a logDir are loaded 
> sequentially when there is a un-clean shutdown. This will take a lot of time 
> for the segments to be loaded as the logSegment.recover(..) is called for 
> every segment and for brokers which have many partitions, the time taken will 
> be very high (we have noticed ~40mins for 2k partitions).
> https://github.com/apache/kafka/pull/1035
> This pull request will make the log-segment load parallel with two 
> configurable properties "log.recovery.threads" and 
> "log.recovery.max.interval.ms".
> Logic:
> 1. Have a threadpool defined of fixed length (log.recovery.threads)
> 2. Submit the logSegment recovery as a job to the threadpool and add the 
> future returned to a job list
> 3. Wait till all the jobs are done within req. time 
> (log.recovery.max.interval.ms - default set to Long.Max).
> 4. If they are done and the futures are all null (meaning that the jobs are 
> successfully completed), it is considered done.
> 5. If any of the recovery jobs failed, then it is logged and 
> LogRecoveryFailedException is thrown
> 6. If the timeout is reached, LogRecoveryFailedException is thrown.
> The logic is backward compatible with the current sequential implementation 
> as the default thread count is set to 1.
> PS: I am new to Scala and the code might look Java-ish but I will be happy to 
> modify the code review changes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-6013) Controller getting stuck

2017-10-04 Thread Ivan Babrou (JIRA)
Ivan Babrou created KAFKA-6013:
--

 Summary: Controller getting stuck
 Key: KAFKA-6013
 URL: https://issues.apache.org/jira/browse/KAFKA-6013
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.11.0.0, 0.11.0.1
Reporter: Ivan Babrou


It looks like a new issue in 0.11.0.0 and 0.11.0.1 still has it.

We upgraded one of the clusters from 0.11.0.0 to 0.11.0.1 by shutting down 28 
machines at once (single rack). When nodes came up none of them progressed 
after these log lines:

{noformat}
Oct 05 02:17:42 mybroker14 kafka[32940]: INFO Kafka version : 0.11.0.1 
(org.apache.kafka.common.utils.AppInfoParser)
Oct 05 02:17:42 mybroker14 kafka[32940]: INFO Kafka commitId : c2a0d5f9b1f45bf5 
(org.apache.kafka.common.utils.AppInfoParser)
Oct 05 02:17:42 mybroker14 kafka[32940]: INFO [Kafka Server 10014], started 
(kafka.server.KafkaServer)
{noformat}

There was no indication in controller node logs that it picked up rebooted 
nodes. This happened multiple times during the upgrade: once per rack plus some 
on top of that.

Reboot took ~20m, all nodes in a single rack rebooted in parallel.

The fix was to restart controller node, but that did not go cleanly too:

{noformat}
ivan@mybroker26:~$ sudo journalctl --since 01:00 -u kafka | fgrep 'Error during 
controlled shutdown' -A1
Oct 05 01:57:41 mybroker26 kafka[37409]: WARN [Kafka Server 10026], Error 
during controlled shutdown, possibly because leader movement took longer than 
the configured controller.socket.timeout.ms and/or request.timeout.ms: 
Connection to 10026 was disconnected before the response was read 
(kafka.server.KafkaServer)
Oct 05 01:57:46 mybroker26 kafka[37409]: WARN [Kafka Server 10026], Retrying 
controlled shutdown after the previous attempt failed... 
(kafka.server.KafkaServer)
--
Oct 05 01:58:16 mybroker26 kafka[37409]: WARN [Kafka Server 10026], Error 
during controlled shutdown, possibly because leader movement took longer than 
the configured controller.socket.timeout.ms and/or request.timeout.ms: 
Connection to 10026 was disconnected before the response was read 
(kafka.server.KafkaServer)
Oct 05 01:58:18 mybroker26 kafka[37409]: INFO Rolled new log segment for 
'requests-40' in 3 ms. (kafka.log.Log)
--
Oct 05 01:58:51 mybroker26 kafka[37409]: WARN [Kafka Server 10026], Error 
during controlled shutdown, possibly because leader movement took longer than 
the configured controller.socket.timeout.ms and/or request.timeout.ms: 
Connection to 10026 was disconnected before the response was read 
(kafka.server.KafkaServer)
Oct 05 01:58:56 mybroker26 kafka[37409]: WARN [Kafka Server 10026], Retrying 
controlled shutdown after the previous attempt failed... 
(kafka.server.KafkaServer)
{noformat}

I'm unable to reproduce the issue by just restarting or even rebooting one 
broker, controller picks it up:

{noformat}
Oct 05 03:18:18 mybroker83 kafka[37402]: INFO [Controller 10083]: Newly added 
brokers: 10001, deleted brokers: , all live brokers: ...
{noformat}

KAFKA-5028 happened in 0.11.0.0, so it's likely related.

cc [~ijuma]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3359) Parallel log-recovery of un-flushed segments on startup

2017-10-04 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16192405#comment-16192405
 ] 

Ivan Babrou commented on KAFKA-3359:


It's me again. We hit the issue again and I googled into this issue again. 
Wanted to add that Kafka re-reads full partitions to recover and it takes 20 
minutes on smallest ones that are around 1.5TB:

{noformat}
Oct 05 01:10:43 mybroker14 kafka[32940]: WARN Found a corrupted index file due 
to requirement failed: Corrupt index found, index file 
(/state/kafka/http/requests-47/0001246285678992.index) has non-zero size 
but the last offset is 1246285678992 which is no larger than the base offset 
1246285678992.}. deleting 
/state/kafka/http/requests-47/0001246285678992.timeindex, 
/state/kafka/http/requests-47/0001246285678992.index, and 
/state/kafka/http/requests-47/0001246285678992.txnindex and rebuilding 
index... (kafka.log.Log)
Oct 05 01:10:43 mybroker14 kafka[32940]: INFO Loading producer state from 
snapshot file '/state/kafka/http/requests-47/0001246285678992.snapshot' for 
partition requests-47 (kafka.log.ProducerStateManager)
Oct 05 01:10:47 mybroker14 kafka[32940]: INFO Recovering unflushed segment 
1246283087840 in log requests-47. (kafka.log.Log)
Oct 05 01:31:29 mybroker14 kafka[32940]: INFO Recovering unflushed segment 
1246284384425 in log requests-47. (kafka.log.Log)
Oct 05 01:31:29 mybroker14 kafka[32940]: INFO Loading producer state from 
snapshot file '/state/kafka/http/requests-47/0001246283087840.snapshot' for 
partition requests-47 (kafka.log.ProducerStateManager)
Oct 05 01:31:36 mybroker14 kafka[32940]: INFO Recovering unflushed segment 
1246285678992 in log requests-47. (kafka.log.Log)
Oct 05 01:31:36 mybroker14 kafka[32940]: INFO Loading producer state from 
snapshot file '/state/kafka/http/requests-47/0001246284384425.snapshot' for 
partition requests-47 (kafka.log.ProducerStateManager)
Oct 05 01:31:42 mybroker14 kafka[32940]: INFO Loading producer state from 
offset 1246286680535 for partition requests-47 with message format version 0 
(kafka.log.Log)
Oct 05 01:31:42 mybroker14 kafka[32940]: INFO Loading producer state from 
snapshot file '/state/kafka/http/requests-47/0001246285678992.snapshot' for 
partition requests-47 (kafka.log.ProducerStateManager)
Oct 05 01:31:43 mybroker14 kafka[32940]: INFO Completed load of log requests-47 
with 719 log segments, log start offset 1245351135299 and log end offset 
1246286680535 in 1260684 ms (kafka.log.Log)
{noformat}

> Parallel log-recovery of un-flushed segments on startup
> ---
>
> Key: KAFKA-3359
> URL: https://issues.apache.org/jira/browse/KAFKA-3359
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Affects Versions: 0.8.2.2, 0.9.0.1
>Reporter: Vamsi Subhash Achanta
>Assignee: Jay Kreps
>
> On startup, currently the log segments within a logDir are loaded 
> sequentially when there is a un-clean shutdown. This will take a lot of time 
> for the segments to be loaded as the logSegment.recover(..) is called for 
> every segment and for brokers which have many partitions, the time taken will 
> be very high (we have noticed ~40mins for 2k partitions).
> https://github.com/apache/kafka/pull/1035
> This pull request will make the log-segment load parallel with two 
> configurable properties "log.recovery.threads" and 
> "log.recovery.max.interval.ms".
> Logic:
> 1. Have a threadpool defined of fixed length (log.recovery.threads)
> 2. Submit the logSegment recovery as a job to the threadpool and add the 
> future returned to a job list
> 3. Wait till all the jobs are done within req. time 
> (log.recovery.max.interval.ms - default set to Long.Max).
> 4. If they are done and the futures are all null (meaning that the jobs are 
> successfully completed), it is considered done.
> 5. If any of the recovery jobs failed, then it is logged and 
> LogRecoveryFailedException is thrown
> 6. If the timeout is reached, LogRecoveryFailedException is thrown.
> The logic is backward compatible with the current sequential implementation 
> as the default thread count is set to 1.
> PS: I am new to Scala and the code might look Java-ish but I will be happy to 
> modify the code review changes.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-3039) Temporary loss of leader resulted in log being completely truncated

2017-08-30 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16147070#comment-16147070
 ] 

Ivan Babrou commented on KAFKA-3039:


We also experienced this and out of 28 upgraded nodes in one rack 4 nodes 
decided to nuke 1 partition (different partitions on each node):

{noformat}
2017-08-30T10:17:29.509 node-93 WARN [ReplicaFetcherThread-0-10042]: Based on 
follower's leader epoch, leader replied with an unknown offset in requests-48. 
High watermark 0 will be used for truncation. 
(kafka.server.ReplicaFetcherThread)
2017-08-30T10:17:29.510 node-93 INFO Truncating log requests-48 to offset 0. 
(kafka.log.Log)
--
2017-08-30T10:17:29.536 node-93 WARN [ReplicaFetcherThread-0-10082]: Based on 
follower's leader epoch, leader replied with an unknown offset in requests-80. 
High watermark 0 will be used for truncation. 
(kafka.server.ReplicaFetcherThread)
2017-08-30T10:17:29.536 node-93 INFO Truncating log requests-80 to offset 0. 
(kafka.log.Log)
--
2017-08-30T10:26:32.203 node-87 WARN [ReplicaFetcherThread-2-10056]: Based on 
follower's leader epoch, leader replied with an unknown offset in requests-82. 
High watermark 0 will be used for truncation. 
(kafka.server.ReplicaFetcherThread)
2017-08-30T10:26:32.204 node-87 INFO Truncating log requests-82 to offset 0. 
(kafka.log.Log)
--
2017-08-30T10:27:31.755 node-89 WARN [ReplicaFetcherThread-3-10055]: Based on 
follower's leader epoch, leader replied with an unknown offset in requests-79. 
High watermark 0 will be used for truncation. 
(kafka.server.ReplicaFetcherThread)
2017-08-30T10:27:31.756 node-89 INFO Truncating log requests-79 to offset 0. 
(kafka.log.Log)
{noformat}

This was a rolling upgrade from 0.10.2.0 to 0.11.0.0. Nodes that truncated logs 
were not leaders before the upgrade (not even preferred).

> Temporary loss of leader resulted in log being completely truncated
> ---
>
> Key: KAFKA-3039
> URL: https://issues.apache.org/jira/browse/KAFKA-3039
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
> Environment: Debian 3.2.54-2 x86_64 GNU/Linux
>Reporter: Imran Patel
>Priority: Critical
>  Labels: reliability
>
> We had an event recently where the temporarily loss of a leader for a 
> partition (during a manual restart), resulted in the leader coming back with 
> no high watermark state and truncating its log to zero. Logs (attached below) 
> indicate that it did have the data but not the commit state. How is this 
> possible?
> Leader (broker 3)
> [2015-12-18 21:19:44,666] INFO Completed load of log messages-14 with log end 
> offset 14175963374 (kafka.log.Log)
> [2015-12-18 21:19:45,170] INFO Partition [messages,14] on broker 3: No 
> checkpointed highwatermark is found for partition [messages,14] 
> (kafka.cluster.Partition)
> [2015-12-18 21:19:45,238] INFO Truncating log messages-14 to offset 0. 
> (kafka.log.Log)
> [2015-12-18 21:20:34,066] INFO Partition [messages,14] on broker 3: Expanding 
> ISR for partition [messages,14] from 3 to 3,10 (kafka.cluster.Partition)
> Replica (broker 10)
> [2015-12-18 21:19:19,525] INFO Partition [messages,14] on broker 10: 
> Shrinking ISR for partition [messages,14] from 3,10,4 to 10,4 
> (kafka.cluster.Partition)
> [2015-12-18 21:20:34,049] ERROR [ReplicaFetcherThread-0-3], Current offset 
> 14175984203 for partition [messages,14] out of range; reset offset to 35977 
> (kafka.server.ReplicaFetcherThread)
> [2015-12-18 21:20:34,033] WARN [ReplicaFetcherThread-0-3], Replica 10 for 
> partition [messages,14] reset its fetch offset from 14175984203 to current 
> leader 3's latest offset 35977 (kafka.server.ReplicaFetcherThread)
> Some relevant config parameters:
> offsets.topic.replication.factor = 3
> offsets.commit.required.acks = -1
> replica.high.watermark.checkpoint.interval.ms = 5000
> unclean.leader.election.enable = false



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5747) Broker crashes on startup when trying to parse empty snapshot files

2017-08-17 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5747?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16131021#comment-16131021
 ] 

Ivan Babrou commented on KAFKA-5747:


This is especially bad, because Kafka starts the process from the beginning and 
it doesn't get any faster between iterations. It seems that recovery state is 
not written until recovery is fully complete. Imagine waiting for 1h to recover 
1k partitions and then seeing this error when just 10 partitions remain.

> Broker crashes on startup when trying to parse empty snapshot files
> ---
>
> Key: KAFKA-5747
> URL: https://issues.apache.org/jira/browse/KAFKA-5747
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.1.0, 0.11.0.0
>Reporter: Lukasz Mierzwa
>
> A broker server crash can sometime result in empty snapshot files on disk 
> (depending on FS, barrier setting etc), when Kafka tries to parse such files 
> it crashes, gets restarted and crashes again, this happens until you remove 
> empty snapshot files with:
> {noformat}
> find /logs/dir -name \*.snapshot -size 0 -delete
> {noformat}
> Log:
> {noformat}
> Aug 15 22:52:11 localhost kafka[23681]: INFO Recovering unflushed segment 0 
> in log __consumer_offsets-16. (kafka.log.Log)
> Aug 15 22:52:11 localhost kafka[23681]: INFO Loading producer state from 
> offset 1207 for partition __consumer_offsets-16 with message format version 0 
> (kafka.log.Log)
> Aug 15 22:52:11 localhost kafka[23681]: INFO Completed load of log 
> __consumer_offsets-16 with 1 log segments, log start offset 0 and log end 
> offset 1207 in 15 ms (kafka.log.Log)
> Aug 15 22:52:11 localhost kafka[23681]: WARN Found a corrupted index file due 
> to requirement failed: Corrupt index found, index file 
> (/disk/data/kafka/mycluster/mytopic-64/300519800823.index) has 
> non-zero size but the last offset is 300519800823 which is no larger than the 
> base offset 300519800823.}. deleting 
> /disk/data/kafka/mycluster/mytopic-64/300519800823.timeindex, 
> /disk/data/kafka/mycluster/mytopic-64/300519800823.index, and 
> /disk/data/kafka/mycluster/mytopic-64/300519800823.txnindex and 
> rebuilding index... (kafka.log.Log)
> Aug 15 22:52:11 localhost kafka[23681]: INFO Loading producer state from 
> snapshot file 300519800823.snapshot for partition mytopic-64 
> (kafka.log.ProducerStateManager)
> Aug 15 22:52:11 localhost kafka[23681]: ERROR There was an error in one of 
> the threads during logs loading: 
> org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
> 'version': java.nio.BufferUnderflowException (kafka.log.LogManager)
> Aug 15 22:52:11 localhost kafka[23681]: FATAL [Kafka Server 10139], Fatal 
> error during KafkaServer startup. Prepare to shutdown 
> (kafka.server.KafkaServer)
> Aug 15 22:52:11 localhost kafka[23681]: 
> org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
> 'version': java.nio.BufferUnderflowException
> Aug 15 22:52:11 localhost kafka[23681]: at 
> org.apache.kafka.common.protocol.types.Schema.read(Schema.java:76)
> Aug 15 22:52:11 localhost kafka[23681]: at 
> kafka.log.ProducerStateManager$.readSnapshot(ProducerStateManager.scala:289)
> Aug 15 22:52:11 localhost kafka[23681]: at 
> kafka.log.ProducerStateManager.loadFromSnapshot(ProducerStateManager.scala:440)
> Aug 15 22:52:11 localhost kafka[23681]: at 
> kafka.log.ProducerStateManager.truncateAndReload(ProducerStateManager.scala:499)
> Aug 15 22:52:11 localhost kafka[23681]: at 
> kafka.log.Log.recoverSegment(Log.scala:327)
> Aug 15 22:52:11 localhost kafka[23681]: at 
> kafka.log.Log.$anonfun$loadSegmentFiles$3(Log.scala:314)
> Aug 15 22:52:11 localhost kafka[23681]: at 
> scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:789)
> Aug 15 22:52:11 localhost kafka[23681]: at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
> Aug 15 22:52:11 localhost kafka[23681]: at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
> Aug 15 22:52:11 localhost kafka[23681]: at 
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:191)
> Aug 15 22:52:11 localhost kafka[23681]: at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:788)
> Aug 15 22:52:11 localhost kafka[23681]: at 
> kafka.log.Log.loadSegmentFiles(Log.scala:272)
> Aug 15 22:52:11 localhost kafka[23681]: at 
> kafka.log.Log.loadSegments(Log.scala:376)
> Aug 15 22:52:11 localhost kafka[23681]: at 
> kafka.log.Log.(Log.scala:179)
> Aug 15 22:52:11 localhost kafka[23681]: at 
> kafka.log.Log$.apply(Log.scala:1581)
> Aug 15 22:52:11 localhost kafka[23681]: at 

[jira] [Commented] (KAFKA-5687) Retention settings are inconsistent between broker and topic

2017-08-01 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16109114#comment-16109114
 ] 

Ivan Babrou commented on KAFKA-5687:


By "impossible" I mean "not documented". In the docs:

* Topic level retention settings:

!screenshot-2.png|width=700!

* Broker level retention settings:

!screenshot-1.png|width=700!

> Retention settings are inconsistent between broker and topic
> 
>
> Key: KAFKA-5687
> URL: https://issues.apache.org/jira/browse/KAFKA-5687
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ivan Babrou
> Attachments: screenshot-1.png, screenshot-2.png
>
>
> Topic configurations:
> * retention.ms
> Broker configurations:
> * log.retention.hours
> * log.retention.minutes
> * log.retention.ms
> First of all, it's impossible to set topic retention time in hours or 
> minutes. Second, "seconds" version is missing between "ms" and "minutes".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (KAFKA-5633) Clarify another scenario of unclean leader election

2017-07-24 Thread Ivan Babrou (JIRA)
Ivan Babrou created KAFKA-5633:
--

 Summary: Clarify another scenario of unclean leader election
 Key: KAFKA-5633
 URL: https://issues.apache.org/jira/browse/KAFKA-5633
 Project: Kafka
  Issue Type: Bug
Reporter: Ivan Babrou


When unclean leader election is enabled, you don't need to lose all replicas of 
some partition, it's enough to lose just one. Leading replica can get into the 
state when it kicks everything out of ISR because it has issue with the 
network, then it can just die, causing leaderless partition.

This is what we saw:

{noformat}
Jul 24 18:05:53 broker-10029 kafka[4104]: INFO Partition [requests,9] on broker 
10029: Shrinking ISR for partition [requests,9] from 10029,10016,10072 to 10029 
(kafka.cluster.Partition)
{noformat}

{noformat}
Topic: requests Partition: 9Leader: -1  Replicas: 
10029,10072,10016 Isr: 10029
{noformat}

This is the default behavior in 0.11.0.0+, but I don't think that docs are 
completely clear about implications. Before the change you could silently lose 
data if the scenario described above happened, but now you can grind your whole 
pipeline to halt when just one node has issues. My understanding is that to 
avoid this you'd want to have min.insync.replicas > 1 and acks > 1 (probably 
all).

It's also worth documenting how to force leader election when unclean leader 
election is disabled. I assume it can be accomplished by switching 
unclean.leader.election.enable on and off again for problematic topic, but 
being crystal clear on this it docs would be tremendously helpful.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (KAFKA-5348) kafka-consumer-groups.sh refuses to remove groups without ids

2017-06-23 Thread Ivan Babrou (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16061802#comment-16061802
 ] 

Ivan Babrou commented on KAFKA-5348:


I don't know how to reproduce, unfortunately. Is there a reason to check for 
/ids on group removal? Why not just remove if there are no active partition 
owners?

> kafka-consumer-groups.sh refuses to remove groups without ids
> -
>
> Key: KAFKA-5348
> URL: https://issues.apache.org/jira/browse/KAFKA-5348
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 0.10.2.0
>Reporter: Ivan Babrou
>Assignee: Vahid Hashemian
>
> In zookeeper I have:
> {noformat}
> [zk: foo(CONNECTED) 37] ls /kafka/logs/consumers/console-consumer-4107
> [offsets]
> {noformat}
> This consumer group also shows up when I list consumer groups:
> {noformat}
> $ /usr/local/kafka/bin/kafka-consumer-groups.sh --zookeeper 
> foo:2181/kafka/logs --list | fgrep console-consumer-4107
> Note: This will only show information about consumers that use ZooKeeper (not 
> those using the Java consumer API).
> console-consumer-4107
> {noformat}
> But I cannot remove this group:
> {noformat}
> $ /usr/local/kafka/bin/kafka-consumer-groups.sh --zookeeper 
> 36zk1.in.pdx.cfdata.org:2181/kafka/logs --delete --group console-consumer-4107
> Note: This will only show information about consumers that use ZooKeeper (not 
> those using the Java consumer API).
> Error: Delete for group 'console-consumer-4107' failed because group does not 
> exist.
> {noformat}
> I ran tcpdump and it turns out that /ids path is checked:
> {noformat}
> $.e.P.fP...&..<...//kafka/logs/consumers/console-consumer-4107/ids.
> {noformat}
> I think kafka should not check for /ids, it should check for / instead here.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)