[jira] [Resolved] (KAFKA-14660) Divide by zero security vulnerability (sonatype-2019-0422)

2023-02-04 Thread Andy Coates (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates resolved KAFKA-14660.
-
Resolution: Fixed

> Divide by zero security vulnerability (sonatype-2019-0422)
> --
>
> Key: KAFKA-14660
> URL: https://issues.apache.org/jira/browse/KAFKA-14660
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
>Reporter: Andy Coates
>Assignee: Matthias J. Sax
>Priority: Minor
> Fix For: 3.5.0
>
>
> Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR 
> and, because the PR was never merged, is now reporting it as a security 
> vulnerability in the latest Kafka Streams library.
>  
> See:
>  * [Vulnerability: 
> sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven=org.apache.kafka%2Fkafka-streams_source=ossindex-client_medium=integration_content=1.7.0)]
>  * [Original PR]([https://github.com/apache/kafka/pull/7414])
>  
> While it looks from the comments made by [~mjsax] and [~bbejeck] that the 
> divide-by-zero is not really an issue, the fact that its now being reported 
> as a vulnerability is, especially with regulators.
> PITA, but we should consider either getting this vulnerability removed 
> (Google wasn't very helpful in providing info on how to do this), or fixed 
> (Again, not sure how to tag the fix as fixing this issue).  One option may 
> just be to reopen the PR and merge (and then fix forward by switching it to 
> throw an exception).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Reopened] (KAFKA-14660) Divide by zero security vulnerability (sonatype-2019-0422)

2023-02-02 Thread Andy Coates (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14660?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates reopened KAFKA-14660:
-

The issue here is more the SonaType security vulnerability report than any 
impossible to reach divide by zero issue. Unfortunately, I'm struggling to find 
information on _how_ to mark the vulnerability resolved in SonaType.  This was 
why I was suggesting opening and merging the PR, as it seems the PR is the 
cause of the report.

I realise the PR's solution wasn't ideal. Hence I was suggesting to merge and 
put in a second change after to fix the fix, so to speak.

If you've already summited a fix for the DBZ, then I see two potential ways 
forward:
 # work out how to inform SonaType the issue is fixed:
 ## There is a [Report 
correction|https://ossindex.sonatype.org/doc/report-vulnerability] link on the 
bug report.  May you, or I if you let me know the PR you fixed the DBZ in, can 
use this to raise the fact its been fixed?
 ## Maybe just tagging the [SonaType 
issue|https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven=org.apache.kafka%2Fkafka-streams_source=ossindex-client_medium=integration_content=1.7.0]
 in your PR would be enough?
 ## Does someone in Confluent know about this stuff that you can talk to?
 ## 
 # reopen, 'adjust' and merge the original PR... hopefully triggering SonaType 
to mark the issue resolved.

> Divide by zero security vulnerability (sonatype-2019-0422)
> --
>
> Key: KAFKA-14660
> URL: https://issues.apache.org/jira/browse/KAFKA-14660
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.3.2
>Reporter: Andy Coates
>Assignee: Matthias J. Sax
>Priority: Minor
> Fix For: 3.5.0
>
>
> Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR 
> and, because the PR was never merged, is now reporting it as a security 
> vulnerability in the latest Kafka Streams library.
>  
> See:
>  * [Vulnerability: 
> sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven=org.apache.kafka%2Fkafka-streams_source=ossindex-client_medium=integration_content=1.7.0)]
>  * [Original PR]([https://github.com/apache/kafka/pull/7414])
>  
> While it looks from the comments made by [~mjsax] and [~bbejeck] that the 
> divide-by-zero is not really an issue, the fact that its now being reported 
> as a vulnerability is, especially with regulators.
> PITA, but we should consider either getting this vulnerability removed 
> (Google wasn't very helpful in providing info on how to do this), or fixed 
> (Again, not sure how to tag the fix as fixing this issue).  One option may 
> just be to reopen the PR and merge (and then fix forward by switching it to 
> throw an exception).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14660) Divide by zero security vulnerability

2023-01-30 Thread Andy Coates (Jira)
Andy Coates created KAFKA-14660:
---

 Summary: Divide by zero security vulnerability
 Key: KAFKA-14660
 URL: https://issues.apache.org/jira/browse/KAFKA-14660
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.3.2
Reporter: Andy Coates


Looks like SonaType has picked up a "Divide by Zero" issue reported in a PR 
and, because the PR was never merged, is now reporting a it as a security 
vulnerability in the latest Kafka Streams library.

 

See:
 * [Vulnerability: 
sonatype-2019-0422]([https://ossindex.sonatype.org/vulnerability/sonatype-2019-0422?component-type=maven=org.apache.kafka%2Fkafka-streams_source=ossindex-client_medium=integration_content=1.7.0)]

 * [Original PR](https://github.com/apache/kafka/pull/7414)

 

While it looks from the comments made by [~mjsax] and [~bbejeck] that the 
divide-by-zero is not really an issue, the fact that its now being reported as 
a vulnerability is, especially with regulators.

PITA, but we should consider either getting this vulnerability removed (Google 
wasn't very helpful in providing info on how to do this), or fixed (Again, not 
sure how to tag the fix as fixing this issue).  One option may just be to 
reopen the PR and merge (and then fix forward by switching it to throw an 
exception).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-10494) Streams: enableSendingOldValues should not call parent if node is itself materialized

2020-09-17 Thread Andy Coates (Jira)
Andy Coates created KAFKA-10494:
---

 Summary: Streams: enableSendingOldValues should not call parent if 
node is itself materialized
 Key: KAFKA-10494
 URL: https://issues.apache.org/jira/browse/KAFKA-10494
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Andy Coates


Calling `enableSendingOldValues` on many nodes, e.g. `KTableFilter`, is 
unnecessarily calling `enableSendingOldValues` on the parent, even when the 
processor itself is materialized. This can force the parent table to be 
materialized unnecessarily.

 

For example:

```

StreamsBuilder builder = new StreamsBuilder();

builder
   .table("t1", Consumed.of(...))
   .filter(predicate, Materialized.as("t2"))
   .

```

If `downStreamOps` result in a call to `enableSendingOldValues` on the table 
returned by the `filter` call, i.e. `t2`, then it will result in `t1` being 
materialized unnecessarily.


This ticket was raised off the back of [comments in a 
PR](https://github.com/apache/kafka/pull/9156#discussion_r490152263) while 
working on https://issues.apache.org/jira/browse/KAFKA-10077.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10077) Filter downstream of state-store results in suprious tombstones

2020-06-01 Thread Andy Coates (Jira)
Andy Coates created KAFKA-10077:
---

 Summary: Filter downstream of state-store results in suprious 
tombstones
 Key: KAFKA-10077
 URL: https://issues.apache.org/jira/browse/KAFKA-10077
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.5.0
Reporter: Andy Coates


Adding a `filter` call downstream of anything that has a state store, e.g. a 
table source, results in spurious tombstones being emitted from the topology 
for any key where a new entry doesn't match the filter, _even when no previous 
value existed for the row_.

To put this another way: a filer downstream of a state-store will output a 
tombstone on an INSERT the doesn't match the filter, when it should only output 
a tombstone on an UPDATE.

 

This code shows the problem:


{code:java}
final StreamsBuilder builder = new StreamsBuilder();

builder
 .table("table", Materialized.with(Serdes.Long(), Serdes.Long()))
 .filter((k, v) -> v % 2 == 0)
 .toStream()
 .to("bob");

final Topology topology = builder.build();

final Properties props = new Properties();
props.put("application.id", "fred");
props.put("bootstrap.servers", "who cares");

final TopologyTestDriver driver = new TopologyTestDriver(topology, props);

final TestInputTopic input = driver
 .createInputTopic("table", Serdes.Long().serializer(), 
Serdes.Long().serializer());

input.pipeInput(1L, 2L);
input.pipeInput(1L, 1L);
input.pipeInput(2L, 1L);


final TestOutputTopic output = driver
 .createOutputTopic("bob", Serdes.Long().deserializer(), 
Serdes.Long().deserializer());

final List> keyValues = output.readKeyValuesToList();

// keyValues contains:
// 1 -> 1
// 1 -> null <-- correct tombstone: deletes previous row.
// 2 -> null <-- spurious tombstone: no previous row. 
{code}
 

These spurious tombstones can cause a LOT of noise when, for example, the 
filter is looking for a specific key.  In such a situation, _every input record 
that does not have that key results in a tombstone!_ meaning there are many 
more tombstones than useful data.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9865) Expose output topic names from TopologyTestDriver

2020-04-14 Thread Andy Coates (Jira)
Andy Coates created KAFKA-9865:
--

 Summary: Expose output topic names from TopologyTestDriver
 Key: KAFKA-9865
 URL: https://issues.apache.org/jira/browse/KAFKA-9865
 Project: Kafka
  Issue Type: Bug
  Components: streams-test-utils
Affects Versions: 2.4.1
Reporter: Andy Coates


Expose the output topic names from TopologyTestDriver, i.e. 
`outputRecordsByTopic.keySet()`.

This is useful to users of the test driver, as they can use it to determine the 
names of all output topics. Which can then be used to capture all output of a 
topology, without having to manually list all the output topics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9668) Iterating over KafkaStreams.getAllMetadata() results in ConcurrentModificationException

2020-03-05 Thread Andy Coates (Jira)
Andy Coates created KAFKA-9668:
--

 Summary: Iterating over KafkaStreams.getAllMetadata() results in 
ConcurrentModificationException
 Key: KAFKA-9668
 URL: https://issues.apache.org/jira/browse/KAFKA-9668
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 0.10.1.0
Reporter: Andy Coates
Assignee: Andy Coates


`KafkaStreams.getAllMetadata()` returns 
`StreamsMetadataState.getAllMetadata()`. All the latter methods is 
`synchronized` it returns a reference to internal mutable state.  Not only does 
this break encapsulation, but it means any thread iterating over the returned 
collection when the metadata gets rebuilt will encounter a 
`ConcurrentModificationException`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9667) Connect JSON serde strip trailing zeros

2020-03-05 Thread Andy Coates (Jira)
Andy Coates created KAFKA-9667:
--

 Summary: Connect JSON serde strip trailing zeros
 Key: KAFKA-9667
 URL: https://issues.apache.org/jira/browse/KAFKA-9667
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Andy Coates
Assignee: Andy Coates


The Connect Json serde was recently enhanced to support serializing decimals as 
standard JSON numbers, e.g. `1.23`.  However, there is a bug in the 
implementation: it's stripping trailing zeros!  `1.23` is _not_ the same as 
`1.230`.  Trailing zeros should not be dropped when de(serializing) decimals.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9416) Streams get stuck in `PENDING_SHUTDOWN` is underlying topics deleted.

2020-01-13 Thread Andy Coates (Jira)
Andy Coates created KAFKA-9416:
--

 Summary: Streams get stuck in `PENDING_SHUTDOWN` is underlying 
topics deleted.
 Key: KAFKA-9416
 URL: https://issues.apache.org/jira/browse/KAFKA-9416
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.4.0
Reporter: Andy Coates


We've noticed that if topics are deleted from under a running topology, e.g. 
repartition, changelog or sink topics, then the stream threads transition from 
`RUNNING` to `PENDING_SHUTDOWN`, but then do not transition to `ERROR`.

Likewise, if a Kafka cluster has auto topic create disabled and a topology is 
started where its sink topic(s) do not exist, then the topology similarly gets 
stuck in `PENDING_SHUTDOWN`.

Once the query is stuck in `PENDING_SHUTDOWN` any call to close the topology 
blocks, as per https://issues.apache.org/jira/browse/KAFKA-9398.

We would like to see Kafka Streams handle this case and correctly transition to 
`ERROR` state.

The cause of this is covered in more detail here: 
https://github.com/confluentinc/ksql/issues/4268#issuecomment-573020281



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-8454) Add Java AdminClient interface

2019-05-31 Thread Andy Coates (JIRA)
Andy Coates created KAFKA-8454:
--

 Summary: Add Java AdminClient interface
 Key: KAFKA-8454
 URL: https://issues.apache.org/jira/browse/KAFKA-8454
 Project: Kafka
  Issue Type: Bug
  Components: admin, clients, core, streams
Reporter: Andy Coates
Assignee: Andy Coates


Task to track the work of [KIP-476: Add Java AdminClient 
Interface|https://cwiki.apache.org/confluence/display/KAFKA/KIP-476%3A+Add+Java+AdminClient+Interface]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7069) AclCommand does not allow 'create' operation on 'topic'

2018-06-18 Thread Andy Coates (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates resolved KAFKA-7069.

Resolution: Invalid

> AclCommand does not allow 'create'  operation on 'topic'
> 
>
> Key: KAFKA-7069
> URL: https://issues.apache.org/jira/browse/KAFKA-7069
> Project: Kafka
>  Issue Type: Bug
>  Components: core, security
>Affects Versions: 2.0.0
>Reporter: Andy Coates
>Assignee: Andy Coates
>Priority: Major
>
> KAFKA-6726 saw 
> [KIP-277|https://cwiki.apache.org/confluence/display/KAFKA/KIP-277+-+Fine+Grained+ACL+for+CreateTopics+API]
>  implemented, which extended the set of operations allowed on the 'topic' 
> resource type to include 'create'.
> The AclCommands CLI class currently rejects this new operation. e.g. running:
> {{bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 
> --add --allow-principal User:KSQL --operation create --topic t1}}
> Fails with error:
> {{ResourceType Topic only supports operations 
> Read,All,AlterConfigs,DescribeConfigs,Delete,Write,Describe}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7069) AclCommand does not allow 'create' operation on 'topic'

2018-06-18 Thread Andy Coates (JIRA)
Andy Coates created KAFKA-7069:
--

 Summary: AclCommand does not allow 'create'  operation on 'topic'
 Key: KAFKA-7069
 URL: https://issues.apache.org/jira/browse/KAFKA-7069
 Project: Kafka
  Issue Type: Bug
  Components: core, security
Affects Versions: 2.0.0
Reporter: Andy Coates
Assignee: Andy Coates


KAFKA-6726 saw 
[KIP-277|https://cwiki.apache.org/confluence/display/KAFKA/KIP-277+-+Fine+Grained+ACL+for+CreateTopics+API]
 implemented, which extended the set of operations allowed on the 'topic' 
resource type to include 'create'.

The AclCommands CLI class currently rejects this new operation. e.g. running:

{{bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add 
--allow-principal User:KSQL --operation create --topic t1}}

Fails with error:

{{ResourceType Topic only supports operations 
Read,All,AlterConfigs,DescribeConfigs,Delete,Write,Describe}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7008) Consider replacing the Resource field in AclBinding with a ResourceFilter or ResourceMatcher

2018-06-07 Thread Andy Coates (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7008?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates resolved KAFKA-7008.

Resolution: Won't Fix

> Consider replacing the Resource field in AclBinding with a ResourceFilter or 
> ResourceMatcher
> 
>
> Key: KAFKA-7008
> URL: https://issues.apache.org/jira/browse/KAFKA-7008
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core, security
>Reporter: Andy Coates
>Assignee: Andy Coates
>Priority: Major
> Fix For: 2.0.0
>
>
> Relating to one of the outstanding work items in PR 
> [#5117|[https://github.com/apache/kafka/pull/5117]...]
> The AclBinding class currently has a Resource field member. But this may make 
> more sense as a ResourceFitler, or some new ResourceMatchers / 
> ResourceSelector class.
> Investigate...
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7011) Investigate if its possible to drop the ResourceNameType field from Java Resource class.

2018-06-06 Thread Andy Coates (JIRA)
Andy Coates created KAFKA-7011:
--

 Summary: Investigate if its possible to drop the ResourceNameType 
field from Java Resource class.
 Key: KAFKA-7011
 URL: https://issues.apache.org/jira/browse/KAFKA-7011
 Project: Kafka
  Issue Type: Sub-task
  Components: core, security
Reporter: Andy Coates
 Fix For: 2.0.0






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7010) Rename ResourceNameType.ANY to MATCH

2018-06-06 Thread Andy Coates (JIRA)
Andy Coates created KAFKA-7010:
--

 Summary: Rename ResourceNameType.ANY to MATCH
 Key: KAFKA-7010
 URL: https://issues.apache.org/jira/browse/KAFKA-7010
 Project: Kafka
  Issue Type: Sub-task
  Components: core, security
Reporter: Andy Coates
 Fix For: 2.0.0


Following on from the PR [#5117|[https://github.com/apache/kafka/pull/5117]...] 
and discussions with Colin McCabe...

The current ResourceNameType.ANY may be misleading as it performs pattern 
matching for wildcard and prefixed bindings. Where as ResourceName.ANY just 
brings back any resource name.

Renaming to ResourceNameType.MATCH and adding more Java doc should clear this 
up.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7008) Consider replacing the Resource field in AclBinding with a ResourceFilter or ResourceMatcher.

2018-06-06 Thread Andy Coates (JIRA)
Andy Coates created KAFKA-7008:
--

 Summary: Consider replacing the Resource field in AclBinding with 
a ResourceFilter or ResourceMatcher.
 Key: KAFKA-7008
 URL: https://issues.apache.org/jira/browse/KAFKA-7008
 Project: Kafka
  Issue Type: Sub-task
  Components: core, security
Reporter: Andy Coates
Assignee: Andy Coates
 Fix For: 2.0.0


Relating to one of the outstanding work items in PR 
[#5117|[https://github.com/apache/kafka/pull/5117]...]

The AclBinding class currently has a Resource field member. But this may make 
more sense as a ResourceFitler, or some new ResourceMatchers / ResourceSelector 
class.

Investigate...

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7007) All ACL changes should use single /kafka-acl-changes path

2018-06-06 Thread Andy Coates (JIRA)
Andy Coates created KAFKA-7007:
--

 Summary: All ACL changes should use single /kafka-acl-changes path 
 Key: KAFKA-7007
 URL: https://issues.apache.org/jira/browse/KAFKA-7007
 Project: Kafka
  Issue Type: Sub-task
  Components: core, security
Reporter: Andy Coates
Assignee: Andy Coates
 Fix For: 2.0.0


Relating to one of the outstanding work items in PR 
[#5117|[https://github.com/apache/kafka/pull/5117]...]

 

The above PR seeing ACL change notifications come through two paths.  Change 
the code to use a single path, with a Json value that defines the 
resource-name-type of the changed binding.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7006) Remove duplicate Scala ResourceNameType class.

2018-06-06 Thread Andy Coates (JIRA)
Andy Coates created KAFKA-7006:
--

 Summary: Remove duplicate Scala ResourceNameType class.
 Key: KAFKA-7006
 URL: https://issues.apache.org/jira/browse/KAFKA-7006
 Project: Kafka
  Issue Type: Sub-task
  Components: core, security
Reporter: Andy Coates
Assignee: Andy Coates
 Fix For: 2.0.0


Relating to one of the outstanding work items in PR 
[#5117|[https://github.com/apache/kafka/pull/5117]...]

The kafka.security.auth.ResourceTypeName class should be dropped in favour of 
the Java.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7005) Remove duplicate Java Resource class.

2018-06-06 Thread Andy Coates (JIRA)
Andy Coates created KAFKA-7005:
--

 Summary: Remove duplicate Java Resource class.
 Key: KAFKA-7005
 URL: https://issues.apache.org/jira/browse/KAFKA-7005
 Project: Kafka
  Issue Type: Sub-task
  Components: core, security
Reporter: Andy Coates
Assignee: Andy Coates
 Fix For: 2.0.0


Relating to one of the outstanding work items in PR 
[#5117|[https://github.com/apache/kafka/pull/5117]...]

The o.a.k.c.request.Resource class could be dropped in favour of 
o.a.k.c..config.ConfigResource.

This will remove the duplication of `Resource` classes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6849) Add transformValues() method to KTable

2018-05-02 Thread Andy Coates (JIRA)
Andy Coates created KAFKA-6849:
--

 Summary:  Add transformValues() method to KTable
 Key: KAFKA-6849
 URL: https://issues.apache.org/jira/browse/KAFKA-6849
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Andy Coates
Assignee: Andy Coates


Add {{transformValues()}} methods to the {{KTable}} interface with the same 
semantics as the functions of the same name on the {{KStream}} interface.

 

More details in 
[KIP-292|https://cwiki.apache.org/confluence/display/KAFKA/KIP-292%3A+Add+transformValues%28%29+method+to+KTable].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6727) org.apache.kafka.clients.admin.Config has broken equals and hashCode method.

2018-03-29 Thread Andy Coates (JIRA)
Andy Coates created KAFKA-6727:
--

 Summary: org.apache.kafka.clients.admin.Config has broken equals 
and hashCode method.
 Key: KAFKA-6727
 URL: https://issues.apache.org/jira/browse/KAFKA-6727
 Project: Kafka
  Issue Type: Improvement
  Components: clients, tools
Affects Versions: 1.1.0
Reporter: Andy Coates
Assignee: Andy Coates


`Config` makes use of `Collections.unmodifiableCollection` to wrap the supplied 
entries to make it immutable. Unfortunately, this breaks `hashCode` and 
`equals`.

>From Java docs:

> The returned collection does _not_ pass the hashCode and equals operations 
>through to the backing collection, but relies on {{Object}}'s {{equals}} and 
>{{hashCode}} methods.

See: 
https://docs.oracle.com/javase/7/docs/api/java/util/Collections.html#unmodifiableCollection(java.util.Collection)

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5246) Remove backdoor that allows any client to produce to internal topics

2017-06-08 Thread Andy Coates (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates updated KAFKA-5246:
---
Resolution: Won't Fix
Status: Resolved  (was: Patch Available)

Discussions on PR mean we're closing this without fixing. Work around is to use 
ACLs to lock down the __consumer_offset topic to only allow required use-cases 
direct access to produce to it.

>  Remove backdoor that allows any client to produce to internal topics
> -
>
> Key: KAFKA-5246
> URL: https://issues.apache.org/jira/browse/KAFKA-5246
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1, 0.10.2.0, 
> 0.10.2.1
>Reporter: Andy Coates
>Assignee: Andy Coates
>Priority: Minor
>
> kafka.admim.AdminUtils defines an ‘AdminClientId' val, which looks to be 
> unused in the code, with the exception of a single use in KafkaAPis.scala in 
> handleProducerRequest, where is looks to allow any client, using the special 
> ‘__admin_client' client id, to append to internal topics.
> This looks like a security risk to me, as it would allow any client to 
> produce either rouge offsets or even a record containing something other than 
> group/offset info.
> Can we remove this please?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (KAFKA-5246) Remove backdoor that allows any client to produce to internal topics

2017-05-15 Thread Andy Coates (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates updated KAFKA-5246:
---
Status: Patch Available  (was: Open)

>  Remove backdoor that allows any client to produce to internal topics
> -
>
> Key: KAFKA-5246
> URL: https://issues.apache.org/jira/browse/KAFKA-5246
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.1, 0.10.2.0, 0.10.1.1, 0.10.1.0, 0.10.0.1, 
> 0.10.0.0
>Reporter: Andy Coates
>Assignee: Andy Coates
>Priority: Minor
>
> kafka.admim.AdminUtils defines an ‘AdminClientId' val, which looks to be 
> unused in the code, with the exception of a single use in KafkaAPis.scala in 
> handleProducerRequest, where is looks to allow any client, using the special 
> ‘__admin_client' client id, to append to internal topics.
> This looks like a security risk to me, as it would allow any client to 
> produce either rouge offsets or even a record containing something other than 
> group/offset info.
> Can we remove this please?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Assigned] (KAFKA-5246) Remove backdoor that allows any client to produce to internal topics

2017-05-15 Thread Andy Coates (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5246?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates reassigned KAFKA-5246:
--

Assignee: Andy Coates

>  Remove backdoor that allows any client to produce to internal topics
> -
>
> Key: KAFKA-5246
> URL: https://issues.apache.org/jira/browse/KAFKA-5246
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1, 0.10.2.0, 
> 0.10.2.1
>Reporter: Andy Coates
>Assignee: Andy Coates
>Priority: Minor
>
> kafka.admim.AdminUtils defines an ‘AdminClientId' val, which looks to be 
> unused in the code, with the exception of a single use in KafkaAPis.scala in 
> handleProducerRequest, where is looks to allow any client, using the special 
> ‘__admin_client' client id, to append to internal topics.
> This looks like a security risk to me, as it would allow any client to 
> produce either rouge offsets or even a record containing something other than 
> group/offset info.
> Can we remove this please?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-5246) Remove backdoor that allows any client to produce to internal topics

2017-05-15 Thread Andy Coates (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16010948#comment-16010948
 ] 

Andy Coates commented on KAFKA-5246:


I've got a patch waiting for this is someone can assign this Jira to me and 
give me what ever permissions are needed to allow me to raise a PR in Github.

Thanks,

Andy

>  Remove backdoor that allows any client to produce to internal topics
> -
>
> Key: KAFKA-5246
> URL: https://issues.apache.org/jira/browse/KAFKA-5246
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.0.0, 0.10.0.1, 0.10.1.0, 0.10.1.1, 0.10.2.0, 
> 0.10.2.1
>Reporter: Andy Coates
>Priority: Minor
>
> kafka.admim.AdminUtils defines an ‘AdminClientId' val, which looks to be 
> unused in the code, with the exception of a single use in KafkaAPis.scala in 
> handleProducerRequest, where is looks to allow any client, using the special 
> ‘__admin_client' client id, to append to internal topics.
> This looks like a security risk to me, as it would allow any client to 
> produce either rouge offsets or even a record containing something other than 
> group/offset info.
> Can we remove this please?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (KAFKA-5246) Remove backdoor that allows any client to produce to internal topics

2017-05-15 Thread Andy Coates (JIRA)
Andy Coates created KAFKA-5246:
--

 Summary:  Remove backdoor that allows any client to produce to 
internal topics
 Key: KAFKA-5246
 URL: https://issues.apache.org/jira/browse/KAFKA-5246
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.10.2.1, 0.10.2.0, 0.10.1.1, 0.10.1.0, 0.10.0.1, 0.10.0.0
Reporter: Andy Coates
Priority: Minor


kafka.admim.AdminUtils defines an ‘AdminClientId' val, which looks to be unused 
in the code, with the exception of a single use in KafkaAPis.scala in 
handleProducerRequest, where is looks to allow any client, using the special 
‘__admin_client' client id, to append to internal topics.

This looks like a security risk to me, as it would allow any client to produce 
either rouge offsets or even a record containing something other than 
group/offset info.

Can we remove this please?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2016-09-26 Thread Andy Coates (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15522746#comment-15522746
 ] 

Andy Coates commented on KAFKA-3919:


Thanks [~ijuma]. Given that the problems I'm experiencing are purely related to 
messages produced with acks=1 and KAFKA-2111 speaks explicitly about acks>1, 
I'd be interested if [~junrao] thinks that 2111 does indeed fix this issue too, 
given this new information on the problem.

Thanks all!

> Broker faills to start after ungraceful shutdown due to non-monotonically 
> incrementing offsets in logs
> --
>
> Key: KAFKA-3919
> URL: https://issues.apache.org/jira/browse/KAFKA-3919
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Andy Coates
>
> Hi All,
> I encountered an issue with Kafka following a power outage that saw a 
> proportion of our cluster disappear. When the power came back on several 
> brokers halted on start up with the error:
> {noformat}
>   Fatal error during KafkaServerStartable startup. Prepare to shutdown”
>   kafka.common.InvalidOffsetException: Attempt to append an offset 
> (1239742691) to position 35728 no larger than the last offset appended 
> (1239742822) to 
> /data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only way to recover the brokers was to delete the log files that 
> contained non monotonically incrementing offsets.
> We've spent some time digging through the logs and I feel I may have worked 
> out the sequence of events leading to this issue, (though this is based on 
> some assumptions I've made about the way Kafka is working, which may be 
> wrong).
> First off, we have unclean leadership elections disable. (We did later enable 
> them to help get around some other issues we were having, but this was 
> several hours after this issue manifested), and we're producing to the topic 
> with gzip compression and acks=1
> We looked through the data logs that were causing the brokers to not start. 
> What we found the initial part of the log has monotonically increasing 
> offset, where each compressed batch normally contained one or two records. 
> Then the is a batch that contains many records, whose first records have an 
> offset below the previous batch and whose last record has an offset above the 
> previous batch. Following on from this there continues a period of large 
> batches, with monotonically increasing offsets, and then the log returns to 
> batches with one or two records.
> Our working assumption here is that the period before the offset dip, with 
> the small batches, is pre-outage normal operation. The period of larger 
> batches is from just after the outage, where producers have a back log to 
> processes when the partition becomes available, and then things return to 
> normal batch sizes again once the back log clears.
> We did also look through the Kafka's application logs to try and piece 
> together the series of events leading up to this. Here’s what we know 
> happened, with regards to one partition that has issues, from the logs:
> Prior 

[jira] [Commented] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2016-09-26 Thread Andy Coates (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15522583#comment-15522583
 ] 

Andy Coates commented on KAFKA-3919:


We experienced a similar incident again recently. This time the cluster was 
destabilised when there was intermittent connectivity issues with ZK for a few 
minutes. This resulted in a quick succession of Controller elections and 
changes, and ISR shrinks and grows.  During the ISR shrinks and grows some 
brokers seem to have got themselves into the same inconsistent state as above 
and halted. To recover the brokers we needed to manually delete the corrupted 
log segments.

So it looks like the above issue is not related, or not just related, to 
ungraceful shutdown. This inconsistent state appears to be possible during ISR 
changes where producers are producing with acks=1. Though obviously the ZK 
connectivity issues will likely have played a role.


> Broker faills to start after ungraceful shutdown due to non-monotonically 
> incrementing offsets in logs
> --
>
> Key: KAFKA-3919
> URL: https://issues.apache.org/jira/browse/KAFKA-3919
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Andy Coates
>
> Hi All,
> I encountered an issue with Kafka following a power outage that saw a 
> proportion of our cluster disappear. When the power came back on several 
> brokers halted on start up with the error:
> {noformat}
>   Fatal error during KafkaServerStartable startup. Prepare to shutdown”
>   kafka.common.InvalidOffsetException: Attempt to append an offset 
> (1239742691) to position 35728 no larger than the last offset appended 
> (1239742822) to 
> /data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only way to recover the brokers was to delete the log files that 
> contained non monotonically incrementing offsets.
> We've spent some time digging through the logs and I feel I may have worked 
> out the sequence of events leading to this issue, (though this is based on 
> some assumptions I've made about the way Kafka is working, which may be 
> wrong).
> First off, we have unclean leadership elections disable. (We did later enable 
> them to help get around some other issues we were having, but this was 
> several hours after this issue manifested), and we're producing to the topic 
> with gzip compression and acks=1
> We looked through the data logs that were causing the brokers to not start. 
> What we found the initial part of the log has monotonically increasing 
> offset, where each compressed batch normally contained one or two records. 
> Then the is a batch that contains many records, whose first records have an 
> offset below the previous batch and whose last record has an offset above the 
> previous batch. Following on from this there continues a period of large 
> batches, with monotonically increasing offsets, and then the log returns to 
> batches with one or two records.
> Our working assumption here is that the period before the offset dip, with 
> the small batches, is pre-outage normal 

[jira] [Commented] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2016-07-12 Thread Andy Coates (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15372565#comment-15372565
 ] 

Andy Coates commented on KAFKA-3919:


[~junrao]  Good stuff. Look forward to hearing from you and getting involved 
more =)

> Broker faills to start after ungraceful shutdown due to non-monotonically 
> incrementing offsets in logs
> --
>
> Key: KAFKA-3919
> URL: https://issues.apache.org/jira/browse/KAFKA-3919
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Andy Coates
>
> Hi All,
> I encountered an issue with Kafka following a power outage that saw a 
> proportion of our cluster disappear. When the power came back on several 
> brokers halted on start up with the error:
> {noformat}
>   Fatal error during KafkaServerStartable startup. Prepare to shutdown”
>   kafka.common.InvalidOffsetException: Attempt to append an offset 
> (1239742691) to position 35728 no larger than the last offset appended 
> (1239742822) to 
> /data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only way to recover the brokers was to delete the log files that 
> contained non monotonically incrementing offsets.
> We've spent some time digging through the logs and I feel I may have worked 
> out the sequence of events leading to this issue, (though this is based on 
> some assumptions I've made about the way Kafka is working, which may be 
> wrong).
> First off, we have unclean leadership elections disable. (We did later enable 
> them to help get around some other issues we were having, but this was 
> several hours after this issue manifested), and we're producing to the topic 
> with gzip compression and acks=1
> We looked through the data logs that were causing the brokers to not start. 
> What we found the initial part of the log has monotonically increasing 
> offset, where each compressed batch normally contained one or two records. 
> Then the is a batch that contains many records, whose first records have an 
> offset below the previous batch and whose last record has an offset above the 
> previous batch. Following on from this there continues a period of large 
> batches, with monotonically increasing offsets, and then the log returns to 
> batches with one or two records.
> Our working assumption here is that the period before the offset dip, with 
> the small batches, is pre-outage normal operation. The period of larger 
> batches is from just after the outage, where producers have a back log to 
> processes when the partition becomes available, and then things return to 
> normal batch sizes again once the back log clears.
> We did also look through the Kafka's application logs to try and piece 
> together the series of events leading up to this. Here’s what we know 
> happened, with regards to one partition that has issues, from the logs:
> Prior to outage:
> * Replicas for the partition are brokers 2011, 2012,  2024, with 2024 being 
> the preferred leader.
> * Producers using acks=1, compression=gzip
> * Brokers configured with 

[jira] [Commented] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2016-07-08 Thread Andy Coates (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15367489#comment-15367489
 ] 

Andy Coates commented on KAFKA-3919:


[~junrao] Yes, we lost a good number of brokers in a power outage.

The solution you're proposing in KAFKA-1211 looks fairly involved, i.e. a 
protocol change, is this something you think I can pick up, (having never 
committed to Kafka, but no newbie to distributed programming), or something I'd 
be best of leaving to the committers? (I'm conscious that its been sat as a 
known issue for a long time, and internally this is viewed as a blocker...)



> Broker faills to start after ungraceful shutdown due to non-monotonically 
> incrementing offsets in logs
> --
>
> Key: KAFKA-3919
> URL: https://issues.apache.org/jira/browse/KAFKA-3919
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Andy Coates
>
> Hi All,
> I encountered an issue with Kafka following a power outage that saw a 
> proportion of our cluster disappear. When the power came back on several 
> brokers halted on start up with the error:
> {noformat}
>   Fatal error during KafkaServerStartable startup. Prepare to shutdown”
>   kafka.common.InvalidOffsetException: Attempt to append an offset 
> (1239742691) to position 35728 no larger than the last offset appended 
> (1239742822) to 
> /data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only way to recover the brokers was to delete the log files that 
> contained non monotonically incrementing offsets.
> We've spent some time digging through the logs and I feel I may have worked 
> out the sequence of events leading to this issue, (though this is based on 
> some assumptions I've made about the way Kafka is working, which may be 
> wrong).
> First off, we have unclean leadership elections disable. (We did later enable 
> them to help get around some other issues we were having, but this was 
> several hours after this issue manifested), and we're producing to the topic 
> with gzip compression and acks=1
> We looked through the data logs that were causing the brokers to not start. 
> What we found the initial part of the log has monotonically increasing 
> offset, where each compressed batch normally contained one or two records. 
> Then the is a batch that contains many records, whose first records have an 
> offset below the previous batch and whose last record has an offset above the 
> previous batch. Following on from this there continues a period of large 
> batches, with monotonically increasing offsets, and then the log returns to 
> batches with one or two records.
> Our working assumption here is that the period before the offset dip, with 
> the small batches, is pre-outage normal operation. The period of larger 
> batches is from just after the outage, where producers have a back log to 
> processes when the partition becomes available, and then things return to 
> normal batch sizes again once the back log clears.
> We did also look through the Kafka's application logs to try and 

[jira] [Commented] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2016-07-06 Thread Andy Coates (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15364141#comment-15364141
 ] 

Andy Coates commented on KAFKA-3919:


[~junrao] My understanding was that the offset index looks at the offset of the 
first record in the compressed set, not the last.  Having checked the code this 
does seem to be the case. (Code below from LogSegment.scala, recover()):

{code}
val startOffset =
entry.message.compressionCodec match {
  case NoCompressionCodec =>
entry.offset
  case _ =>
ByteBufferMessageSet.deepIterator(entry.message).next().offset
  }
{code}

> Broker faills to start after ungraceful shutdown due to non-monotonically 
> incrementing offsets in logs
> --
>
> Key: KAFKA-3919
> URL: https://issues.apache.org/jira/browse/KAFKA-3919
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Andy Coates
>
> Hi All,
> I encountered an issue with Kafka following a power outage that saw a 
> proportion of our cluster disappear. When the power came back on several 
> brokers halted on start up with the error:
> {noformat}
>   Fatal error during KafkaServerStartable startup. Prepare to shutdown”
>   kafka.common.InvalidOffsetException: Attempt to append an offset 
> (1239742691) to position 35728 no larger than the last offset appended 
> (1239742822) to 
> /data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only way to recover the brokers was to delete the log files that 
> contained non monotonically incrementing offsets.
> We've spent some time digging through the logs and I feel I may have worked 
> out the sequence of events leading to this issue, (though this is based on 
> some assumptions I've made about the way Kafka is working, which may be 
> wrong).
> First off, we have unclean leadership elections disable. (We did later enable 
> them to help get around some other issues we were having, but this was 
> several hours after this issue manifested), and we're producing to the topic 
> with gzip compression and acks=1
> We looked through the data logs that were causing the brokers to not start. 
> What we found the initial part of the log has monotonically increasing 
> offset, where each compressed batch normally contained one or two records. 
> Then the is a batch that contains many records, whose first records have an 
> offset below the previous batch and whose last record has an offset above the 
> previous batch. Following on from this there continues a period of large 
> batches, with monotonically increasing offsets, and then the log returns to 
> batches with one or two records.
> Our working assumption here is that the period before the offset dip, with 
> the small batches, is pre-outage normal operation. The period of larger 
> batches is from just after the outage, where producers have a back log to 
> processes when the partition becomes available, and then things return to 
> normal batch sizes again once the back log clears.
> We did also look 

[jira] [Updated] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2016-07-05 Thread Andy Coates (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates updated KAFKA-3919:
---
Description: 
Hi All,

I encountered an issue with Kafka following a power outage that saw a 
proportion of our cluster disappear. When the power came back on several 
brokers halted on start up with the error:

{noformat}
Fatal error during KafkaServerStartable startup. Prepare to shutdown”
kafka.common.InvalidOffsetException: Attempt to append an offset 
(1239742691) to position 35728 no larger than the last offset appended 
(1239742822) to 
/data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
at 
kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
at kafka.log.LogSegment.recover(LogSegment.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.log.Log.loadSegments(Log.scala:160)
at kafka.log.Log.(Log.scala:90)
at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{noformat}

The only way to recover the brokers was to delete the log files that contained 
non monotonically incrementing offsets.

We've spent some time digging through the logs and I feel I may have worked out 
the sequence of events leading to this issue, (though this is based on some 
assumptions I've made about the way Kafka is working, which may be wrong).

First off, we have unclean leadership elections disable. (We did later enable 
them to help get around some other issues we were having, but this was several 
hours after this issue manifested), and we're producing to the topic with gzip 
compression and acks=1

We looked through the data logs that were causing the brokers to not start. 
What we found the initial part of the log has monotonically increasing offset, 
where each compressed batch normally contained one or two records. Then the is 
a batch that contains many records, whose first records have an offset below 
the previous batch and whose last record has an offset above the previous 
batch. Following on from this there continues a period of large batches, with 
monotonically increasing offsets, and then the log returns to batches with one 
or two records.

Our working assumption here is that the period before the offset dip, with the 
small batches, is pre-outage normal operation. The period of larger batches is 
from just after the outage, where producers have a back log to processes when 
the partition becomes available, and then things return to normal batch sizes 
again once the back log clears.

We did also look through the Kafka's application logs to try and piece together 
the series of events leading up to this. Here’s what we know happened, with 
regards to one partition that has issues, from the logs:

Prior to outage:
* Replicas for the partition are brokers 2011, 2012,  2024, with 2024 being the 
preferred leader.
* Producers using acks=1, compression=gzip
* Brokers configured with unclean.elections=false, zk.session-timeout=36s

Post outage:
* 2011 comes up first, (also as the Controller), recovers unflushed log segment 
1239444214, completes load with offset 1239740602, and becomes leader of the 
partition.
* 2012 comes up next, recovers its log,  recovers unflushed log segment 
1239444214, truncates to offset 1239742830, (thats 2,228 records ahead of the 
recovered offset of the current leader), and starts following.
* 2024 comes up quickly after 2012.  recovers unflushed log segment 1239444214, 
truncates to offset  1239742250, (thats 1,648 records ahead of the recovered 
offset of the current leader), and starts following.
* The Controller adds 2024 to the 

[jira] [Updated] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2016-07-05 Thread Andy Coates (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates updated KAFKA-3919:
---
Description: 
Hi All,

I encountered an issue with Kafka following a power outage that saw a 
proportion of our cluster disappear. When the power came back on several 
brokers halted on start up with the error:

{noformat}
Fatal error during KafkaServerStartable startup. Prepare to shutdown”
kafka.common.InvalidOffsetException: Attempt to append an offset 
(1239742691) to position 35728 no larger than the last offset appended 
(1239742822) to 
/data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
at 
kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
at kafka.log.LogSegment.recover(LogSegment.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.log.Log.loadSegments(Log.scala:160)
at kafka.log.Log.(Log.scala:90)
at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{noformat}

The only way to recover the brokers was to delete the log files that contained 
non monotonically incrementing offsets.

We've spent some time digging through the logs and I feel I may have worked out 
the sequence of events leading to this issue, (though this is based on some 
assumptions I've made about the way Kafka is working, which may be wrong).

First off, we have unclean leadership elections disable. (We did later enable 
them to help get around some other issues we were having, but this was several 
hours after this issue manifested), and we're producing to the topic with gzip 
compression and acks=1

We looked through the data logs that were causing the brokers to not start. 
What we found the initial part of the log has monotonically increasing offset, 
where each compressed batch normally contained one or two records. Then the is 
a batch that contains many records, whose first records have an offset below 
the previous batch and whose last record has an offset above the previous 
batch. Following on from this there continues a period of large batches, with 
monotonically increasing offsets, and then the log returns to batches with one 
or two records.

Our working assumption here is that the period before the offset dip, with the 
small batches, is pre-outage normal operation. The period of larger batches is 
from just after the outage, where producers have a back log to processes when 
the partition becomes available, and then things return to normal batch sizes 
again once the back log clears.

We did also look through the Kafka's application logs to try and piece together 
the series of events leading up to this. Here’s what we know happened, with 
regards to one partition that has issues, from the logs:

Prior to outage:
Replicas for the partition are brokers 2011, 2012,  2024, with 2024 being the 
preferred leader.
Producers using acks=1, compression=gzip
Brokers configured with unclean.elections=false, zk.session-timeout=36s

Post outage:
2011 comes up first, (also as the Controller), recovers unflushed log segment 
1239444214, completes load with offset 1239740602, and becomes leader of the 
partition.
2012 comes up next, recovers its log,  recovers unflushed log segment 
1239444214, truncates to offset 1239742830, (thats 2,228 records ahead of the 
recovered offset of the current leader), and starts following.
2024 comes up quickly after 2012.  recovers unflushed log segment 1239444214, 
truncates to offset  1239742250, (thats 1,648 records ahead of the recovered 
offset of the current leader), and starts following.
The Controller adds 2024 to the replica set 

[jira] [Commented] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2016-07-05 Thread Andy Coates (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15362604#comment-15362604
 ] 

Andy Coates commented on KAFKA-3919:


Hi [~junrao], thanks for taking the time to look at this.

First off, we have unclean leadership elections disable. (We did later enable 
them to help get around some other issues we were having, but this was several 
hours after this issue manifested).

We did look through data logs that were causing the brokers to not start. What 
we found before the incident was a monotonically increasing offset, where each 
compressed batch normally contained one or two records. Then the is a batch 
that contains many records, whose first records has an offset below the 
previous batch and whose last record has an offset above the previous batch. 
Following on from this there continues a period of large batches, with 
monotonically increasing offsets, and then the log returns to batches with one 
or two records.

Our working assumption here is that the period before the offset dip is 
pre-outage normal operation. The period of larger batches is from just after 
the outage, where producers have a back log to processes when the partition 
becomes available, and then things return to normal batch sizes again once the 
back log clears.

We did also look through the Kafka's application logs to try and piece together 
the series of events leading up to this:

Here’s what I know happened, with regards to one partition that has issues, 
from the logs:

Prior to outage:
Replicas for the partition are brokers 2011, 2012,  2024, with 2024 being the 
preferred leader.
Producers using acks=1, compression=gzip
Brokers configured with unclean.elections=false, zk.session-timeout=36s

Post outage:
2011 comes up first, (also as the Controller), recovers unflushed log segment 
1239444214, completes load with offset 1239740602, and becomes leader of the 
partition.
2012 comes up next, recovers its log,  recovers unflushed log segment 
1239444214, truncates to offset 1239742830, (thats 2,228 records ahead of the 
recovered offset of the current leader), and starts following.
2024 comes up quickly after 2012.  recovers unflushed log segment 1239444214, 
truncates to offset  1239742250, (thats 1,648 records ahead of the recovered 
offset of the current leader), and starts following.
The Controller adds 2024 to the replica set just before 2024 halts due to 
another partition having an offset greater than the leader.
The Controller adds 2012 to the replica set just before 2012 halts due to 
another partition having an offset greater than the leader.
When 2012 is next restarted, it fails to fully start as its complaining of 
invalid offsets in the log.

You’ll notice that the offset the brokers truncate to are different for each of 
the three brokers. 

Given that I can write to the partition with only one broker available, and 
that I can then take this broker down and bring up a different one from the 
replica set and write to that one, how does Kafka currently look to reconcile 
these different histories when the first node is brought back online?  I know 
that if the first node has a greater offset it will halt when it tries to 
follow the second, but what happens if the first node has a lower offset?

Maybe the above scenario is correctly handled and I’m off down a tangent! (I’d 
appreciate any info to improve my understanding of Kafka and help me figure out 
what is happening here.). I’m just trying to reconcile the data I’m seeing in 
the logs and your response to my post.

I’m going to extract the pertinent entries from our app logs, obfuscate and add 
them in here.

(I’ll also add some of that I’ve written here to the description above for the 
benefit of anyone new to the ticket)

Thanks,

Andy

> Broker faills to start after ungraceful shutdown due to non-monotonically 
> incrementing offsets in logs
> --
>
> Key: KAFKA-3919
> URL: https://issues.apache.org/jira/browse/KAFKA-3919
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Andy Coates
>
> Hi All,
> I encountered an issue with Kafka following a power outage that saw a 
> proportion of our cluster disappear. When the power came back on several 
> brokers halted on start up with the error:
> {noformat}
>   Fatal error during KafkaServerStartable startup. Prepare to shutdown”
>   kafka.common.InvalidOffsetException: Attempt to append an offset 
> (1239742691) to position 35728 no larger than the last offset appended 
> (1239742822) to 
> /data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at 

[jira] [Updated] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2016-07-01 Thread Andy Coates (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates updated KAFKA-3919:
---
Description: 
Hi All,

I encountered an issue with Kafka following a power outage that saw a 
proportion of our cluster disappear. When the power came back on several 
brokers halted on start up with the error:

{noformat}
Fatal error during KafkaServerStartable startup. Prepare to shutdown”
kafka.common.InvalidOffsetException: Attempt to append an offset 
(1239742691) to position 35728 no larger than the last offset appended 
(1239742822) to 
/data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
at 
kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
at kafka.log.LogSegment.recover(LogSegment.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.log.Log.loadSegments(Log.scala:160)
at kafka.log.Log.(Log.scala:90)
at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{noformat}

The only way to recover the brokers was to delete the log files that contained 
non monotonically incrementing offsets.

I’ve spent some time digging through the logs and I feel I may have worked out 
the sequence of events leading to this issue, (though this is based on some 
assumptions I've made about the way Kafka is working, which may be wrong).

Given:
* A topic that is produced to using acks = 1
* A topic that is produced to using gzip compression
* A topic that has min.isr set to less than the number of replicas, (i.e. 
min.isr=2, #replicas=3)
* Following ISRs are lagging behind the leader by some small number of 
messages, (which is normal with acks=1)
* brokers are configured with fairly large zk session timeout e.g. 30s.
* brokers are configured so that unclean leader elections are disabled.

Then:
When something like a power outage take out all three replicas, its possible to 
get into a state such that the indexes won’t rebuild on a restart and a broker 
fails to start. This can happen when:
* Enough brokers, but not the pre-outage leader, come on-line for the partition 
to be writeable
* Producers produce enough records to the partition that the head offset is now 
greater than the pre-outage leader head offset.
* The pre-outage leader comes back online.

At this point the logs on the pre-outage leader have diverged from the other 
replicas.  It has some messages that are not in the other replicas, and the 
other replicas have some records not in the pre-outage leader's log - at the 
same offsets.

I’m assuming that because the current leader has at higher offset than the 
pre-outage leader, the pre-outage leader just starts following the leader and 
requesting the records it thinks its missing.

I’m also assuming that because the producers were using gzip, so each record is 
actual a compressed message set, that iwhen the pre-outage leader requests 
records from the leader, the offset it requests could just happened to be in 
the middle of a compressed batch, but the leader returns the full batch.  When 
the pre-outage leader appends this batch to its own log it thinks all is OK. 
But what has happened is that the offsets in the log are no longer 
monotonically incrementing. Instead they actually dip by the number of records 
in the compressed batch that were before the requested offset.  If and when 
this broker restarts this dip may be at the 4K boundary the indexer checks. If 
it is, the broker won’t start.

Several of our brokers were unlucky enough to hit that 4K boundary, causing a 
protracted outage.  We’ve written a little utility that shows several more 

[jira] [Issue Comment Deleted] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2016-07-01 Thread Andy Coates (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates updated KAFKA-3919:
---
Comment: was deleted

(was: Hi [~junrao], thanks for taking the time to look at this.

Note: I've incorporated some of what I say below into the problem description 
above, so that it doesn't get lost in the comments.

First off, we have unclean leadership elections disable. (We did later enable 
them to help get around some other issues we were having, but this was several 
hours after this issue manifested).

We did look through data logs that were causing the brokers to not start. What 
we found before the incident was a monotonically increasing offset, where each 
compressed batch normally contained one or two records. Then the is a batch 
that contains many records, whose first records has an offset below the 
previous batch and whose last record has an offset above the previous batch. 
Following on from this there continues a period of large batches, with 
monotonically increasing offsets, and then the log returns to batches with one 
or two records.

Our working assumption here is that the period before the offset dip is 
pre-outage normal operation. The period of larger batches is from just after 
the outage, where producers have a back log to processes when the partition 
becomes available, and then things return to normal batch sizes again once the 
back log clears.

We did also look through the Kafka's application logs to try and piece together 
the series of events leading up to this:

Here’s what I know happened, with regards to one partition that has issues, 
from the logs:

Prior to outage:
Replicas for the partition are brokers 2011, 2012,  2024, with 2024 being the 
preferred leader.
Producers using acks=1, compression=gzip
Brokers configured with unclean.elections=false, zk.session-timeout=36s

Post outage:
2011 comes up first, (also as the Controller), recovers unflushed log set 
1239444214, completes load with offset 1239740602, and becomes leader of the 
partition.
2012 comes up next, recovers its log,  recovers unflushed log set 1239444214, 
truncates to offset 1239742830, (thats 2,228 records ahead of the recovered 
offset of the current leader), and starts following.
2024 comes up quickly after 2012.  recovers unflushed log set 1239444214, 
truncates to offset  1239742250, (thats 1,648 records ahead of the recovered 
offset of the current leader), and starts following.
The Controller adds 2024 to the replica set just before 2024 halts due to 
another partition having an offset greater than the leader.
The Controller adds 2012 to the replica set just before 2012 halts due to 
another partition having an offset greater than the leader.
When 2012 is next restarted, it fails to fully start as its complaining of 
invalid offsets in the log.

Our working hypothesis here is that the partition becomes writeable again, 
possibly as brokers 2012 & 2024 get added to the ISR set before halting, and 
maybe don’t remove themselves when they halt? - hence remain in the ISR set for 
36 seconds. Mean while our producers are happily sending large compressed 
batches, as they have a backlog, to broker 2011, which is accepting them, (as 
there are enough replicas in the ISR set), and appending them to its log - 
moving its offset beyond brokers 2012 and 2024.

Log entries:

(Interleaved log entries from the three brokers - the broker id is in the [id] 
brackets)

Just as the power was going out I see this in the broker that was the 
controller:

2016-04-11 12:01:42 - [2026] - "[Partition state machine on Controller 2026]: 
Invoking state change to OnlinePartition for partitions 
[mt_xp_its_music_main_itsevent,20]”

2016-04-11 12:01:56 - [2026] - "[Replica state machine on controller 2026]: 
Invoking state change to OfflineReplica for replicas
[Topic=mt_xp_its_music_main_itsevent,Partition=20,Replica=2024] 

2016-04-11 12:01:56 - [2026] - "[Controller 2026]: Cannot remove replica 2024 
from ISR of partition [mt_xp_its_music_main_itsevent,20] since it is not in the 
ISR. Leader = 2011 ; ISR = List(2011, 2012)”

2016-04-11 12:01:56 - [2026] - "[Channel manager on controller 2026]: Not 
sending request 
{controller_id=2026,controller_epoch=111,delete_partitions=0,partitions=[{topic=mt_xp_its_music_main_itsevent,partition=20}]}
 to broker 2024, since it is offline.”

2016-04-11 12:04:46 - [2026] - [Replica state machine on controller 2026]: 
Invoking state change to OnlineReplica for replicas
[Topic=mt_xp_its_music_main_itsevent,Partition=20,Replica=2024]

2016-04-11 12:04:58 - [2026] - "[Controller 2026]: Starting preferred replica 
leader election for partitions [mt_xp_its_music_main_itsevent,20]”

2016-04-11 12:04:58 - [2026] - "[Partition state machine on Controller 2026]: 
Invoking state change to OnlinePartition for partitions 
[mt_xp_its_music_main_itsevent,20]”

2016-04-11 12:04:58 - [2026] 

[jira] [Commented] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2016-07-01 Thread Andy Coates (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15358704#comment-15358704
 ] 

Andy Coates commented on KAFKA-3919:


Hi [~junrao], thanks for taking the time to look at this.

Note: I've incorporated some of what I say below into the problem description 
above, so that it doesn't get lost in the comments.

First off, we have unclean leadership elections disable. (We did later enable 
them to help get around some other issues we were having, but this was several 
hours after this issue manifested).

We did look through data logs that were causing the brokers to not start. What 
we found before the incident was a monotonically increasing offset, where each 
compressed batch normally contained one or two records. Then the is a batch 
that contains many records, whose first records has an offset below the 
previous batch and whose last record has an offset above the previous batch. 
Following on from this there continues a period of large batches, with 
monotonically increasing offsets, and then the log returns to batches with one 
or two records.

Our working assumption here is that the period before the offset dip is 
pre-outage normal operation. The period of larger batches is from just after 
the outage, where producers have a back log to processes when the partition 
becomes available, and then things return to normal batch sizes again once the 
back log clears.

We did also look through the Kafka's application logs to try and piece together 
the series of events leading up to this:

Here’s what I know happened, with regards to one partition that has issues, 
from the logs:

Prior to outage:
Replicas for the partition are brokers 2011, 2012,  2024, with 2024 being the 
preferred leader.
Producers using acks=1, compression=gzip
Brokers configured with unclean.elections=false, zk.session-timeout=36s

Post outage:
2011 comes up first, (also as the Controller), recovers unflushed log set 
1239444214, completes load with offset 1239740602, and becomes leader of the 
partition.
2012 comes up next, recovers its log,  recovers unflushed log set 1239444214, 
truncates to offset 1239742830, (thats 2,228 records ahead of the recovered 
offset of the current leader), and starts following.
2024 comes up quickly after 2012.  recovers unflushed log set 1239444214, 
truncates to offset  1239742250, (thats 1,648 records ahead of the recovered 
offset of the current leader), and starts following.
The Controller adds 2024 to the replica set just before 2024 halts due to 
another partition having an offset greater than the leader.
The Controller adds 2012 to the replica set just before 2012 halts due to 
another partition having an offset greater than the leader.
When 2012 is next restarted, it fails to fully start as its complaining of 
invalid offsets in the log.

Our working hypothesis here is that the partition becomes writeable again, 
possibly as brokers 2012 & 2024 get added to the ISR set before halting, and 
maybe don’t remove themselves when they halt? - hence remain in the ISR set for 
36 seconds. Mean while our producers are happily sending large compressed 
batches, as they have a backlog, to broker 2011, which is accepting them, (as 
there are enough replicas in the ISR set), and appending them to its log - 
moving its offset beyond brokers 2012 and 2024.

Log entries:

(Interleaved log entries from the three brokers - the broker id is in the [id] 
brackets)

Just as the power was going out I see this in the broker that was the 
controller:

2016-04-11 12:01:42 - [2026] - "[Partition state machine on Controller 2026]: 
Invoking state change to OnlinePartition for partitions 
[mt_xp_its_music_main_itsevent,20]”

2016-04-11 12:01:56 - [2026] - "[Replica state machine on controller 2026]: 
Invoking state change to OfflineReplica for replicas
[Topic=mt_xp_its_music_main_itsevent,Partition=20,Replica=2024] 

2016-04-11 12:01:56 - [2026] - "[Controller 2026]: Cannot remove replica 2024 
from ISR of partition [mt_xp_its_music_main_itsevent,20] since it is not in the 
ISR. Leader = 2011 ; ISR = List(2011, 2012)”

2016-04-11 12:01:56 - [2026] - "[Channel manager on controller 2026]: Not 
sending request 
{controller_id=2026,controller_epoch=111,delete_partitions=0,partitions=[{topic=mt_xp_its_music_main_itsevent,partition=20}]}
 to broker 2024, since it is offline.”

2016-04-11 12:04:46 - [2026] - [Replica state machine on controller 2026]: 
Invoking state change to OnlineReplica for replicas
[Topic=mt_xp_its_music_main_itsevent,Partition=20,Replica=2024]

2016-04-11 12:04:58 - [2026] - "[Controller 2026]: Starting preferred replica 
leader election for partitions [mt_xp_its_music_main_itsevent,20]”

2016-04-11 12:04:58 - [2026] - "[Partition state machine on Controller 2026]: 
Invoking state change to OnlinePartition for partitions 
[mt_xp_its_music_main_itsevent,20]”

2016-04-11 12:04:58 

[jira] [Updated] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2016-07-01 Thread Andy Coates (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates updated KAFKA-3919:
---
Description: 
Hi All,

I encountered an issue with Kafka following a power outage that saw a 
proportion of our cluster disappear. When the power came back on several 
brokers halted on start up with the error:

{noformat}
Fatal error during KafkaServerStartable startup. Prepare to shutdown”
kafka.common.InvalidOffsetException: Attempt to append an offset 
(1239742691) to position 35728 no larger than the last offset appended 
(1239742822) to 
/data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
at 
kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
at kafka.log.LogSegment.recover(LogSegment.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.log.Log.loadSegments(Log.scala:160)
at kafka.log.Log.(Log.scala:90)
at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{noformat}

The only way to recover the brokers was to delete the log files that contained 
non monotonically incrementing offsets.

I’ve spent some time digging through the logs and I feel I may have worked out 
the sequence of events leading to this issue, (though this is based on some 
assumptions I've made about the way Kafka is working, which may be wrong):

Given:
* A topic that is produced to using acks = 1
* A topic that is produced to using gzip compression
* A topic that has min.isr set to less than the number of replicas, (i.e. 
min.isr=2, #replicas=3)
* Following ISRs are lagging behind the leader by some small number of 
messages, (which is normal with acks=1)
* brokers are configured with fairly large zk session timeout e.g. 30s.
* brokers are configured so that unclean leader elections are disabled.

Then:
When something like a power outage take out all three replicas, its possible to 
get into a state such that the indexes won’t rebuild on a restart and a broker 
fails to start. This can happen when:
* Enough brokers, but not the pre-outage leader, come on-line for the partition 
to be writeable
* Producers produce enough records to the partition that the head offset is now 
greater than the pre-outage leader head offset.
* The pre-outage leader comes back online.

At this point the logs on the pre-outage leader have diverged from the other 
replicas.  It has some messages that are not in the other replicas, and the 
other replicas have some records not in the pre-outage leader's log - at the 
same offsets.

I’m assuming that because the current leader has at higher offset than the 
pre-outage leader, the pre-outage leader just starts following the leader and 
requesting the records it thinks its missing.

I’m also assuming that because the producers were using gzip, so each record is 
actual a compressed message set, that iwhen the pre-outage leader requests 
records from the leader, the offset it requests could just happened to be in 
the middle of a compressed batch, but the leader returns the full batch.  When 
the pre-outage leader appends this batch to its own log it thinks all is OK. 
But what has happened is that the offsets in the log are no longer 
monotonically incrementing. Instead they actually dip by the number of records 
in the compressed batch that were before the requested offset.  If and when 
this broker restarts this dip may be at the 4K boundary the indexer checks. If 
it is, the broker won’t start.

Several of our brokers were unlucky enough to hit that 4K boundary, causing a 
protracted outage.  We’ve written a little utility that shows several more 

[jira] [Commented] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2016-06-30 Thread Andy Coates (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15356679#comment-15356679
 ] 

Andy Coates commented on KAFKA-3919:


[~ijuma] - duplicate submission not intentional, (Jira timed out on first 
submission). Closed as duplicate
[~junrao] - Yep, I did check app-logs for truncation points and data-logs to 
see what the offsets were doing. I'll get back to you shortly with details.

> Broker faills to start after ungraceful shutdown due to non-monotonically 
> incrementing offsets in logs
> --
>
> Key: KAFKA-3919
> URL: https://issues.apache.org/jira/browse/KAFKA-3919
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Andy Coates
>
> Hi All,
> I encountered an issue with Kafka following a power outage that saw a 
> proportion of our cluster disappear. When the power came back on several 
> brokers halted on start up with the error:
> {noformat}
>   Fatal error during KafkaServerStartable startup. Prepare to shutdown”
>   kafka.common.InvalidOffsetException: Attempt to append an offset 
> (1239742691) to position 35728 no larger than the last offset appended 
> (1239742822) to 
> /data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only way to recover the brokers was to delete the log files that 
> contained non monotonically incrementing offsets.
> I’ve spent some time digging through the logs and I feel I may have worked 
> out the sequence of events leading to this issue, (though this is based on 
> some assumptions I've made about the way Kafka is working, which may be 
> wrong):
> Given:
> * A topic that is produced to using acks = 1
> * A topic that is produced to using gzip compression
> * A topic that has min.isr set to less than the number of replicas, (i.e. 
> min.isr=2, #replicas=3)
> * Following ISRs are lagging behind the leader by some small number of 
> messages, (which is normal with acks=1)
> * brokers are configured with fairly large zk session timeout e.g. 30s.
> Then:
> When something like a power outage take out all three replicas, its possible 
> to get into a state such that the indexes won’t rebuild on a restart and a 
> broker fails to start. This can happen when:
> * Enough brokers, but not the pre-outage leader, come on-line for the 
> partition to be writeable
> * Producers produce enough records to the partition that the head offset is 
> now greater than the pre-outage leader head offset.
> * The pre-outage leader comes back online.
> At this point the logs on the pre-outage leader have diverged from the other 
> replicas.  It has some messages that are not in the other replicas, and the 
> other replicas have some records not in the pre-outage leader's log - at the 
> same offsets.
> I’m assuming that because the current leader has at higher offset than the 
> pre-outage leader, the pre-outage leader just starts following the leader and 
> requesting the records it thinks its missing.
> I’m also assuming that because the producers were using gzip, so each record 
> is actual a compressed message 

[jira] [Resolved] (KAFKA-3918) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2016-06-30 Thread Andy Coates (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates resolved KAFKA-3918.

Resolution: Duplicate

> Broker faills to start after ungraceful shutdown due to non-monotonically 
> incrementing offsets in logs
> --
>
> Key: KAFKA-3918
> URL: https://issues.apache.org/jira/browse/KAFKA-3918
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.1
>Reporter: Andy Coates
>
> Hi All,
> I encountered an issue with Kafka following a power outage that saw a 
> proportion of our cluster disappear. When the power came back on several 
> brokers halted on start up with the error:
> {noformat}
>   Fatal error during KafkaServerStartable startup. Prepare to shutdown”
>   kafka.common.InvalidOffsetException: Attempt to append an offset 
> (1239742691) to position 35728 no larger than the last offset appended 
> (1239742822) to 
> /data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
>   at 
> kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>   at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
>   at kafka.log.LogSegment.recover(LogSegment.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
>   at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
>   at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>   at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>   at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>   at kafka.log.Log.loadSegments(Log.scala:160)
>   at kafka.log.Log.(Log.scala:90)
>   at 
> kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
>   at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {noformat}
> The only way to recover the brokers was to delete the log files that 
> contained non monotonically incrementing offsets.
> I’ve spent some time digging through the logs and I feel I may have worked 
> out the sequence of events leading to this issue, (though this is based on 
> some assumptions I've made about the way Kafka is working, which may be 
> wrong):
> Given:
> * A topic that is produced to using acks = 1
> * A topic that is produced to using gzip compression
> * A topic that has min.isr set to less than the number of replicas, (i.e. 
> min.isr=2, #replicas=3)
> * Following ISRs are lagging behind the leader by some small number of 
> messages, (which is normal with acks=1)
> * brokers are configured with fairly large zk session timeout e.g. 30s.
> Then:
> When something like a power outage take out all three replicas, its possible 
> to get into a state such that the indexes won’t rebuild on a restart and a 
> broker fails to start. This can happen when:
> * Enough brokers, but not the pre-outage leader, come on-line for the 
> partition to be writeable
> * Producers produce enough records to the partition that the head offset is 
> now greater than the pre-outage leader head offset.
> * The pre-outage leader comes back online.
> At this point the logs on the pre-outage leader have diverged from the other 
> replicas.  It has some messages that are not in the other replicas, and the 
> other replicas have some records not in the pre-outage leader's log.
> I’m assuming that because the current leader has a higher offset that the 
> pre-outage leader, the pre-outage leader just starts following the leader and 
> requesting the records it thinks its missing.
> I’m also assuming that because the producers were using gzip, so each record 
> is actual a compressed message set, that when the pre-outage leader requests 
> records from the leader, the offset it requests just happened to be in the 
> middle of a compressed batch, but the leader returned the full batch.  When 
> the pre-outage leader appends this batch to its own log it thinks all is OK. 
> But what has 

[jira] [Updated] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2016-06-29 Thread Andy Coates (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andy Coates updated KAFKA-3919:
---
Description: 
Hi All,

I encountered an issue with Kafka following a power outage that saw a 
proportion of our cluster disappear. When the power came back on several 
brokers halted on start up with the error:

{noformat}
Fatal error during KafkaServerStartable startup. Prepare to shutdown”
kafka.common.InvalidOffsetException: Attempt to append an offset 
(1239742691) to position 35728 no larger than the last offset appended 
(1239742822) to 
/data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
at 
kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
at kafka.log.LogSegment.recover(LogSegment.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.log.Log.loadSegments(Log.scala:160)
at kafka.log.Log.(Log.scala:90)
at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{noformat}

The only way to recover the brokers was to delete the log files that contained 
non monotonically incrementing offsets.

I’ve spent some time digging through the logs and I feel I may have worked out 
the sequence of events leading to this issue, (though this is based on some 
assumptions I've made about the way Kafka is working, which may be wrong):

Given:
* A topic that is produced to using acks = 1
* A topic that is produced to using gzip compression
* A topic that has min.isr set to less than the number of replicas, (i.e. 
min.isr=2, #replicas=3)
* Following ISRs are lagging behind the leader by some small number of 
messages, (which is normal with acks=1)
* brokers are configured with fairly large zk session timeout e.g. 30s.

Then:
When something like a power outage take out all three replicas, its possible to 
get into a state such that the indexes won’t rebuild on a restart and a broker 
fails to start. This can happen when:
* Enough brokers, but not the pre-outage leader, come on-line for the partition 
to be writeable
* Producers produce enough records to the partition that the head offset is now 
greater than the pre-outage leader head offset.
* The pre-outage leader comes back online.

At this point the logs on the pre-outage leader have diverged from the other 
replicas.  It has some messages that are not in the other replicas, and the 
other replicas have some records not in the pre-outage leader's log - at the 
same offsets.

I’m assuming that because the current leader has at higher offset than the 
pre-outage leader, the pre-outage leader just starts following the leader and 
requesting the records it thinks its missing.

I’m also assuming that because the producers were using gzip, so each record is 
actual a compressed message set, that iwhen the pre-outage leader requests 
records from the leader, the offset it requests could just happened to be in 
the middle of a compressed batch, but the leader returns the full batch.  When 
the pre-outage leader appends this batch to its own log it thinks all is OK. 
But what has happened is that the offsets in the log are no longer 
monotonically incrementing. Instead they actually dip by the number of records 
in the compressed batch that were before the requested offset.  If and when 
this broker restarts this dip may be at the 4K boundary the indexer checks. If 
it is, the broker won’t start.

Several of our brokers were unlucky enough to hit that 4K boundary, causing a 
protracted outage.  We’ve written a little utility that shows several more 
brokers have a dip outside of the 4K boundary.

There are some assumptions 

[jira] [Created] (KAFKA-3918) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2016-06-29 Thread Andy Coates (JIRA)
Andy Coates created KAFKA-3918:
--

 Summary: Broker faills to start after ungraceful shutdown due to 
non-monotonically incrementing offsets in logs
 Key: KAFKA-3918
 URL: https://issues.apache.org/jira/browse/KAFKA-3918
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.9.0.1
Reporter: Andy Coates


Hi All,

I encountered an issue with Kafka following a power outage that saw a 
proportion of our cluster disappear. When the power came back on several 
brokers halted on start up with the error:

{noformat}
Fatal error during KafkaServerStartable startup. Prepare to shutdown”
kafka.common.InvalidOffsetException: Attempt to append an offset 
(1239742691) to position 35728 no larger than the last offset appended 
(1239742822) to 
/data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
at 
kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
at kafka.log.LogSegment.recover(LogSegment.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.log.Log.loadSegments(Log.scala:160)
at kafka.log.Log.(Log.scala:90)
at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{noformat}

The only way to recover the brokers was to delete the log files that contained 
non monotonically incrementing offsets.

I’ve spent some time digging through the logs and I feel I may have worked out 
the sequence of events leading to this issue, (though this is based on some 
assumptions I've made about the way Kafka is working, which may be wrong):

Given:
* A topic that is produced to using acks = 1
* A topic that is produced to using gzip compression
* A topic that has min.isr set to less than the number of replicas, (i.e. 
min.isr=2, #replicas=3)
* Following ISRs are lagging behind the leader by some small number of 
messages, (which is normal with acks=1)
* brokers are configured with fairly large zk session timeout e.g. 30s.

Then:
When something like a power outage take out all three replicas, its possible to 
get into a state such that the indexes won’t rebuild on a restart and a broker 
fails to start. This can happen when:
* Enough brokers, but not the pre-outage leader, come on-line for the partition 
to be writeable
* Producers produce enough records to the partition that the head offset is now 
greater than the pre-outage leader head offset.
* The pre-outage leader comes back online.

At this point the logs on the pre-outage leader have diverged from the other 
replicas.  It has some messages that are not in the other replicas, and the 
other replicas have some records not in the pre-outage leader's log.

I’m assuming that because the current leader has a higher offset that the 
pre-outage leader, the pre-outage leader just starts following the leader and 
requesting the records it thinks its missing.

I’m also assuming that because the producers were using gzip, so each record is 
actual a compressed message set, that when the pre-outage leader requests 
records from the leader, the offset it requests just happened to be in the 
middle of a compressed batch, but the leader returned the full batch.  When the 
pre-outage leader appends this batch to its own log it thinks all is OK. But 
what has happened is that the offsets in the log are no longer monotonically 
incrementing. Instead they actually dip by the number of records in the 
compressed batch that were before the requested offset.  If and when this 
broker restarts this dip may be at the 4K boundary the indexer checks. If it 
is, the broker won’t start.

Several of our brokers 

[jira] [Created] (KAFKA-3919) Broker faills to start after ungraceful shutdown due to non-monotonically incrementing offsets in logs

2016-06-29 Thread Andy Coates (JIRA)
Andy Coates created KAFKA-3919:
--

 Summary: Broker faills to start after ungraceful shutdown due to 
non-monotonically incrementing offsets in logs
 Key: KAFKA-3919
 URL: https://issues.apache.org/jira/browse/KAFKA-3919
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 0.9.0.1
Reporter: Andy Coates


Hi All,

I encountered an issue with Kafka following a power outage that saw a 
proportion of our cluster disappear. When the power came back on several 
brokers halted on start up with the error:

{noformat}
Fatal error during KafkaServerStartable startup. Prepare to shutdown”
kafka.common.InvalidOffsetException: Attempt to append an offset 
(1239742691) to position 35728 no larger than the last offset appended 
(1239742822) to 
/data3/kafka/mt_xp_its_music_main_itsevent-20/001239444214.index.
at 
kafka.log.OffsetIndex$$anonfun$append$1.apply$mcV$sp(OffsetIndex.scala:207)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.log.OffsetIndex$$anonfun$append$1.apply(OffsetIndex.scala:197)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
at kafka.log.OffsetIndex.append(OffsetIndex.scala:197)
at kafka.log.LogSegment.recover(LogSegment.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:188)
at kafka.log.Log$$anonfun$loadSegments$4.apply(Log.scala:160)
at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at kafka.log.Log.loadSegments(Log.scala:160)
at kafka.log.Log.(Log.scala:90)
at 
kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$3$$anonfun$apply$10$$anonfun$apply$1.apply$mcV$sp(LogManager.scala:150)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:60)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{noformat}

The only way to recover the brokers was to delete the log files that contained 
non monotonically incrementing offsets.

I’ve spent some time digging through the logs and I feel I may have worked out 
the sequence of events leading to this issue, (though this is based on some 
assumptions I've made about the way Kafka is working, which may be wrong):

Given:
* A topic that is produced to using acks = 1
* A topic that is produced to using gzip compression
* A topic that has min.isr set to less than the number of replicas, (i.e. 
min.isr=2, #replicas=3)
* Following ISRs are lagging behind the leader by some small number of 
messages, (which is normal with acks=1)
* brokers are configured with fairly large zk session timeout e.g. 30s.

Then:
When something like a power outage take out all three replicas, its possible to 
get into a state such that the indexes won’t rebuild on a restart and a broker 
fails to start. This can happen when:
* Enough brokers, but not the pre-outage leader, come on-line for the partition 
to be writeable
* Producers produce enough records to the partition that the head offset is now 
greater than the pre-outage leader head offset.
* The pre-outage leader comes back online.

At this point the logs on the pre-outage leader have diverged from the other 
replicas.  It has some messages that are not in the other replicas, and the 
other replicas have some records not in the pre-outage leader's log.

I’m assuming that because the current leader has a higher offset that the 
pre-outage leader, the pre-outage leader just starts following the leader and 
requesting the records it thinks its missing.

I’m also assuming that because the producers were using gzip, so each record is 
actual a compressed message set, that when the pre-outage leader requests 
records from the leader, the offset it requests just happened to be in the 
middle of a compressed batch, but the leader returned the full batch.  When the 
pre-outage leader appends this batch to its own log it thinks all is OK. But 
what has happened is that the offsets in the log are no longer monotonically 
incrementing. Instead they actually dip by the number of records in the 
compressed batch that were before the requested offset.  If and when this 
broker restarts this dip may be at the 4K boundary the indexer checks. If it 
is, the broker won’t start.

Several of our brokers