[jira] [Commented] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically

2019-03-28 Thread xiongqi wu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804577#comment-16804577
 ] 

xiongqi wu commented on KAFKA-7362:
---

Yes, I am stilling looking forward on fixing this issue.

 I was on hold on this item because

1)  there is no active discussion on KIP-370. 

2)  I am also thinking about combing this with topic deletion. Today a topic 
can only be deleted when all related brokers are online.  This causes topic 
deletion to stuck if a broker fails and never come back again.  I was thinking 
combining this feature with the change of topic deletion state machine so that 
topic deletion can always success and orphan partitions can be safely removed 
when offline brokers come back online.

I will update the ticket and KIP-370 after I think a little more about item 2. 
 

> enable kafka broker to remove orphan partitions automatically 
> --
>
> Key: KAFKA-7362
> URL: https://issues.apache.org/jira/browse/KAFKA-7362
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
>
> When partition reassignment removes topic partitions from a offline broker, 
> those removed partitions become orphan partitions to the broker. When the 
> offline broker comes back online, it is not able to clean up both data and 
> folders that belong to orphan partitions.  Log manager will scan all all dirs 
> during startup, but the time based retention policy on a topic partition will 
> not be kicked out until the broker is either a follower or a leader of the 
> partition.  In addition, we do not have logic to delete folders that belong 
> to orphan partition today. 
> Open this ticket to provide a mechanism (when enabled) to safely remove 
> orphan partitions automatically.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8157) Missing "key.serializer" exception when setting "segment index bytes"

2019-03-28 Thread Travis Brad Ellis (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804539#comment-16804539
 ] 

Travis Brad Ellis commented on KAFKA-8157:
--

I'd like to work this. Newbie here. I sent a request to be added as a 
contributor via 

[d...@kafka.apache.org|mailto:d...@kafka.apache.org] . Planning to assign it to 
myself once added.

 

> Missing "key.serializer" exception when setting "segment index bytes"
> -
>
> Key: KAFKA-8157
> URL: https://issues.apache.org/jira/browse/KAFKA-8157
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
> Environment: ubuntu 18.10, localhost and Aiven too
>Reporter: Cristian D
>Priority: Major
>  Labels: beginner, newbie
>
> As a `kafka-streams` user,
> When I set the "segment index bytes" property
> Then I would like to have internal topics with the specified allocated disk 
> space
>  
> At the moment, when setting the "topic.segment.index.bytes" property, the 
> application is exiting with following exception: 
> {code:java}
> Exception in thread "main" org.apache.kafka.common.config.ConfigException: 
> Missing required configuration "key.serializer" which has no default value.
> {code}
> Tested with `kafka-streams` v2.0.0 and v2.2.0.
>  
> Stack trace:
> {code:java}
> Exception in thread "main" org.apache.kafka.common.config.ConfigException: 
> Missing required configuration "key.serializer" which has no default value.
>  at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474)
>  at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:464)
>  at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62)
>  at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:75)
>  at 
> org.apache.kafka.clients.producer.ProducerConfig.(ProducerConfig.java:392)
>  at 
> org.apache.kafka.streams.StreamsConfig.getMainConsumerConfigs(StreamsConfig.java:1014)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:666)
>  at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:718)
>  at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:634)
>  at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:544)
>  at app.Main.main(Main.java:36)
> {code}
> A demo application simulating the exception:
> https://github.com/razorcd/java-snippets-and-demo-projects/tree/master/kafkastreamsdemo
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2019-03-28 Thread huxihx (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804533#comment-16804533
 ] 

huxihx commented on KAFKA-7965:
---

Part of the failure is caused by the fact that the rebalance did not finish as 
expected. Some consumer was not assigned with any partitions. I could steadily 
reproduce this issue but still figure out what's going wrong with 
`waitForRebalance`

> Flaky Test 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> 
>
> Key: KAFKA-7965
> URL: https://issues.apache.org/jira/browse/KAFKA-7965
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Stanislav Kozlovski
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0, 2.2.1
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/]
> {quote}java.lang.AssertionError: Received 0, expected at least 68 at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.assertTrue(Assert.java:41) at 
> kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) 
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320)
>  at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8106) Remove unnecessary decompression operation when logValidator do validation.

2019-03-28 Thread hackerwin7 (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804532#comment-16804532
 ] 

hackerwin7 commented on KAFKA-8106:
---

Maybe we can put the record level validation to client endpoint or others? And 
record message format add a validation filed to header, In broker side, broker 
can check this validation field to prevent record level validation.

> Remove unnecessary decompression operation when logValidator  do validation.
> 
>
> Key: KAFKA-8106
> URL: https://issues.apache.org/jira/browse/KAFKA-8106
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
> Environment: Server : 
> cpu:2*16 ; 
> MemTotal : 256G;
> Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network 
> Connection ; 
> SSD.
>Reporter: Flower.min
>Assignee: Flower.min
>Priority: Major
>  Labels: performance
>
>       We do performance testing about Kafka in specific scenarios as 
> described below .We build a kafka cluster with one broker,and create topics 
> with different number of partitions.Then we start lots of producer processes 
> to send large amounts of messages to one of the topics at one  testing .
> *_Specific Scenario_*
>   
>  *_1.Main config of Kafka_*  
>  # Main config of Kafka  
> server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/]
>  # Number of TopicPartition : 50~2000
>  # Size of Single Message : 1024B
>  
>  *_2.Config of KafkaProducer_* 
> ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory||
> |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB|
> *_3.The best result of performance testing_*  
> ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of 
> production||
> |550MB/s~610MB/s|97%~99%|:550MB/s~610MB/s       |23,000,000 messages/s|
> *_4.Phenomenon and  my doubt_*
>     _The upper limit of CPU usage has been reached  But  it does not 
> reach the upper limit of the bandwidth of the server  network. *We are 
> doubtful about which  cost too much CPU time and we want to Improve  
> performance and reduces CPU usage of Kafka server.*_
>   
>  _*5.Analysis*_
>         We analysis the JFIR of Kafka server when doing performance testing 
> .We found the hot spot method is 
> *_"java.io.DataInputStream.readFully(byte[],int,int)"_* and 
> *_"org.apache.kafka.common.record.KafkaLZ4BlockInputStream.read(byte[],int,int)"_*.When
>   we checking thread stack information we  also have found most CPU being 
> occupied by lots of thread  which  is busy decompressing messages.Then we 
> read source code of Kafka .
>        There is double-layer nested Iterator  when LogValidator do validate 
> every record.And There is a decompression for each message when traversing 
> every RecordBatch iterator. It is consuming CPU and affect total performance 
> that  decompress message._*The purpose of decompressing every messages just 
> for gain total size in bytes of one record and size in bytes of record body 
> when magic value to use is above 1 and no format conversion or value 
> overwriting is required for compressed messages.It is negative for 
> performance in common usage scenarios .*_{color:#33}Therefore, we suggest 
> that *_removing unnecessary decompression operation_* when doing  validation 
> for compressed message  when magic value to use is above 1 and no format 
> conversion or value overwriting is required for compressed messages.{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8174) Can't call arbitrary SimpleBenchmarks tests from streams_simple_benchmark_test.py

2019-03-28 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8174:
--

 Summary: Can't call arbitrary SimpleBenchmarks tests from 
streams_simple_benchmark_test.py
 Key: KAFKA-8174
 URL: https://issues.apache.org/jira/browse/KAFKA-8174
 Project: Kafka
  Issue Type: Bug
Reporter: Sophie Blee-Goldman


When using the script streams_simple_benchmark_test.py you should be able to 
specify a test name and run that particular method in SimpleBenchmarks. This 
works for most existing benchmarks, however you can't use this to run the 
"yahoo" benchmark and you can't add new tests to SimpleBenchmarks and start 
them successfully. 

 

If you try to run yahoo/new test it fails with the error "Not enough parameters 
are provided; expecting propFileName, testName, numRecords, keySkew, valueSize" 
in main(); the missing argument turns out to be testName.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically

2019-03-28 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804470#comment-16804470
 ] 

Dhruvil Shah commented on KAFKA-7362:
-

[~xiongqiwu] are you planning to work on this JIRA? If not, I could take a stab 
at fixing it.

> enable kafka broker to remove orphan partitions automatically 
> --
>
> Key: KAFKA-7362
> URL: https://issues.apache.org/jira/browse/KAFKA-7362
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
>
> When partition reassignment removes topic partitions from a offline broker, 
> those removed partitions become orphan partitions to the broker. When the 
> offline broker comes back online, it is not able to clean up both data and 
> folders that belong to orphan partitions.  Log manager will scan all all dirs 
> during startup, but the time based retention policy on a topic partition will 
> not be kicked out until the broker is either a follower or a leader of the 
> partition.  In addition, we do not have logic to delete folders that belong 
> to orphan partition today. 
> Open this ticket to provide a mechanism (when enabled) to safely remove 
> orphan partitions automatically.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8159) Built-in serdes for signed numbers do not obey lexicographical ordering

2019-03-28 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-8159:
---
Description: 
Currently we assume consistent ordering between serialized and deserialized 
keys, e.g. if the objects obey objA < objB < objC then the serialized Bytes 
will also obey bytesA < bytesB < bytesC. This is not true in general of the 
built-in serdes for signed numerical types (eg Integer, Long). Specifically, it 
is broken by the negative number representations which are lexicographically 
greater than (all) positive number representations. 

 

One consequence of this is that an interactive query of a key range with a 
negative lower bound and positive upper bound (eg keyValueStore.range(-1, 1) 
will result in "unexpected behavior" depending on the specific store type.

 

For RocksDB stores with caching disabled, an empty iterator will be returned 
regardless of whether any records do exist in that range. 

For in-memory stores and ANY store with caching enabled, Streams will throw an 
unchecked exception and crash.

  was:
If a user creates a queryable state store using one of the signed built-in 
serdes (eg Integer) for the key, there is nothing preventing records with 
negative keys from being inserted and/or fetched individually. However if the 
user tries to query the store for a range of keys starting with a negative 
number, unexpected behavior results that is store-specific.

 

For RocksDB stores with caching disabled, an empty iterator will be returned. 

For in-memory stores and ANY store with caching enabled, Streams will throw an 
unchecked exception.

 

This situation should be handled more gracefully, or users should be informed 
of this limitation and the result should at least be consist across types of 
store.


> Built-in serdes for signed numbers do not obey lexicographical ordering
> ---
>
> Key: KAFKA-8159
> URL: https://issues.apache.org/jira/browse/KAFKA-8159
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> Currently we assume consistent ordering between serialized and deserialized 
> keys, e.g. if the objects obey objA < objB < objC then the serialized Bytes 
> will also obey bytesA < bytesB < bytesC. This is not true in general of the 
> built-in serdes for signed numerical types (eg Integer, Long). Specifically, 
> it is broken by the negative number representations which are 
> lexicographically greater than (all) positive number representations. 
>  
> One consequence of this is that an interactive query of a key range with a 
> negative lower bound and positive upper bound (eg keyValueStore.range(-1, 1) 
> will result in "unexpected behavior" depending on the specific store type.
>  
> For RocksDB stores with caching disabled, an empty iterator will be returned 
> regardless of whether any records do exist in that range. 
> For in-memory stores and ANY store with caching enabled, Streams will throw 
> an unchecked exception and crash.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7502) Cleanup KTable materialization logic in a single place

2019-03-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804456#comment-16804456
 ] 

ASF GitHub Bot commented on KAFKA-7502:
---

dongjinleekr commented on pull request #6520: KAFKA-7502: Cleanup KTable 
materialization logic in a single place
URL: https://github.com/apache/kafka/pull/6520
 
 
   This PR is the final part of KAFKA-7502, which cleans up 
`KTableImpl#doMapValues` method. (follow up of #6174, #6453, and #6519)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Cleanup KTable materialization logic in a single place
> --
>
> Key: KAFKA-7502
> URL: https://issues.apache.org/jira/browse/KAFKA-7502
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Lee Dongjin
>Priority: Major
>
> Today since we pre-create all the `KTableXXX` operator along with the logical 
> node, we are effectively duplicating the logic to determine whether the 
> resulted KTable should be materialized. More specifically, the 
> materialization principle today is that:
> 1) If users specified Materialized in the DSL and it contains a queryable 
> name. We always materialize.
> 2) If users specified Materialized in the DSL but not contains a queryable 
> name, or if users do not specify a Materialized object at all, Streams may 
> choose to materialize or not. But in any cases, even if the KTable is 
> materialized it will not be queryable since there's no queryable name (i.e. 
> only storeName is not null, but queryableName is null):
> 2.a) If the resulted KTable is from an aggregation, we always materialize 
> since it is needed for storing the aggregation (i.e. we use the 
> MaterializedInternal constructor with nameProvider != null).
> 2.b) If the resulted KTable is from a source topic, we delay the 
> materialization until the downstream operator requires this KTable to be 
> materialized or send-old-values (see `KTableSourceNode` and `KTableSource`).
> 2.c) If the resulted KTable if from a join, we always materialize if users 
> creates a Materialized object even without a queryable name. However this can 
> be optimized similar to 2.b) but is orthogonal to this ticket (see 
> `KTableImpl#buildJoin` where we always use constructor with nameProvider != 
> null).
> 2.d) If the resulted KTable is from a stateless operation like filter / 
> mapValues, we never materialize.
> 
> Now, in all of these cases, we have logical node like "KTableKTableJoinNode", 
> as well as physical node like `ProcessorNode`. Ideally we should always 
> create the logical Plan (i.e. the StreamsGraph), and then optimize it if 
> necessary, and then generate the physical plan (i.e. the Topology), however 
> today we create some physical nodes beforehand, and the above logic is hence 
> duplicated in the creation of both physical nodes and logical nodes. For 
> example, in `KTableKTableJoinNode` we check if Materialized is null for 
> adding a state store, and in `KTableImpl#doJoin` we check if materialized is 
> specified (case 2.c) above). 
> Another example is in TableProcessorNode which is used for 2.d) above, in 
> which it includes the logic whereas its caller, `KTableImpl#doFilter` for 
> example, also contains the logic when deciding to pass `queryableName` 
> parameter to `KTableProcessorSupplier`.
> This is bug-vulnerable since we may update the logic in one class but forgot 
> to update the other class.
> --
> What we want to have is a cleaner code path similar to what we have for 2.b), 
> such that when creating the logical nodes we keep track of whether 1) 
> materialized is specified, and 2) queryable name is provided. And during 
> optimization phase, we may change the inner physical ProcessorBuilder's 
> parameters like queryable name etc, and then when it is time to generate the 
> physical node, we can just blindly take the parameters and go for it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8159) Built-in serdes for signed numbers do not obey lexicographical ordering

2019-03-28 Thread Sophie Blee-Goldman (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-8159:
---
Summary: Built-in serdes for signed numbers do not obey lexicographical 
ordering  (was: Multi-key range queries with negative keyFrom results in 
unexpected behavior)

> Built-in serdes for signed numbers do not obey lexicographical ordering
> ---
>
> Key: KAFKA-8159
> URL: https://issues.apache.org/jira/browse/KAFKA-8159
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> If a user creates a queryable state store using one of the signed built-in 
> serdes (eg Integer) for the key, there is nothing preventing records with 
> negative keys from being inserted and/or fetched individually. However if the 
> user tries to query the store for a range of keys starting with a negative 
> number, unexpected behavior results that is store-specific.
>  
> For RocksDB stores with caching disabled, an empty iterator will be returned. 
> For in-memory stores and ANY store with caching enabled, Streams will throw 
> an unchecked exception.
>  
> This situation should be handled more gracefully, or users should be informed 
> of this limitation and the result should at least be consist across types of 
> store.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7502) Cleanup KTable materialization logic in a single place

2019-03-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804449#comment-16804449
 ] 

ASF GitHub Bot commented on KAFKA-7502:
---

dongjinleekr commented on pull request #6519: KAFKA-7502: Cleanup KTable 
materialization logic in a single place
URL: https://github.com/apache/kafka/pull/6519
 
 
   This PR is a follow-up of #6174 and #6453, which cleans up 
`KTableImpl#doTransformValues` method.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Cleanup KTable materialization logic in a single place
> --
>
> Key: KAFKA-7502
> URL: https://issues.apache.org/jira/browse/KAFKA-7502
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Lee Dongjin
>Priority: Major
>
> Today since we pre-create all the `KTableXXX` operator along with the logical 
> node, we are effectively duplicating the logic to determine whether the 
> resulted KTable should be materialized. More specifically, the 
> materialization principle today is that:
> 1) If users specified Materialized in the DSL and it contains a queryable 
> name. We always materialize.
> 2) If users specified Materialized in the DSL but not contains a queryable 
> name, or if users do not specify a Materialized object at all, Streams may 
> choose to materialize or not. But in any cases, even if the KTable is 
> materialized it will not be queryable since there's no queryable name (i.e. 
> only storeName is not null, but queryableName is null):
> 2.a) If the resulted KTable is from an aggregation, we always materialize 
> since it is needed for storing the aggregation (i.e. we use the 
> MaterializedInternal constructor with nameProvider != null).
> 2.b) If the resulted KTable is from a source topic, we delay the 
> materialization until the downstream operator requires this KTable to be 
> materialized or send-old-values (see `KTableSourceNode` and `KTableSource`).
> 2.c) If the resulted KTable if from a join, we always materialize if users 
> creates a Materialized object even without a queryable name. However this can 
> be optimized similar to 2.b) but is orthogonal to this ticket (see 
> `KTableImpl#buildJoin` where we always use constructor with nameProvider != 
> null).
> 2.d) If the resulted KTable is from a stateless operation like filter / 
> mapValues, we never materialize.
> 
> Now, in all of these cases, we have logical node like "KTableKTableJoinNode", 
> as well as physical node like `ProcessorNode`. Ideally we should always 
> create the logical Plan (i.e. the StreamsGraph), and then optimize it if 
> necessary, and then generate the physical plan (i.e. the Topology), however 
> today we create some physical nodes beforehand, and the above logic is hence 
> duplicated in the creation of both physical nodes and logical nodes. For 
> example, in `KTableKTableJoinNode` we check if Materialized is null for 
> adding a state store, and in `KTableImpl#doJoin` we check if materialized is 
> specified (case 2.c) above). 
> Another example is in TableProcessorNode which is used for 2.d) above, in 
> which it includes the logic whereas its caller, `KTableImpl#doFilter` for 
> example, also contains the logic when deciding to pass `queryableName` 
> parameter to `KTableProcessorSupplier`.
> This is bug-vulnerable since we may update the logic in one class but forgot 
> to update the other class.
> --
> What we want to have is a cleaner code path similar to what we have for 2.b), 
> such that when creating the logical nodes we keep track of whether 1) 
> materialized is specified, and 2) queryable name is provided. And during 
> optimization phase, we may change the inner physical ProcessorBuilder's 
> parameters like queryable name etc, and then when it is time to generate the 
> physical node, we can just blindly take the parameters and go for it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8173) Kafka Errors after version upgrade from 0.10.2.2 to 2.1.1

2019-03-28 Thread Amit Anand (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Amit Anand updated KAFKA-8173:
--
Description: 
After Kafka version upgrade from 0.10.2.2 to 2.1.1 Warnings starts coming for 
all the topics "due to Corrupt time index found, time index file".

{code:java}
[2019-03-28 17:23:55.877+] WARN [Log partition=FirstTopic-6, 
dir=/apps/kafka/data] Found a corrupted index file corresponding to log file 
/apps/kafka/data/FirstTopic-6/0494.log due to Corrupt time 
index found, time index file 
(/apps/kafka/data/FirstTopic-6/0494.timeindex) has non-zero 
size but the last timestamp is 0 which is less than the first timestamp 
1553720469480}, recovering segment and rebuilding index files... 
(kafka.log.Log) }}
{{[2019-03-28 17:23:55.877+] WARN [Log partition=NewTopic-3, 
dir=/apps/kafka/data] Found a corrupted index file corresponding to log file 
/apps/kafka/data/NewTopic-3/0494.log due to Corrupt time index 
found, time index file 
(/apps/kafka/data/NewTopic-3/0494.timeindex) has non-zero size 
but the last timestamp is 0 which is less than the first timestamp 
1553720469480}, recovering segment and rebuilding index files... 
(kafka.log.Log) [2019-03-28 17:23:55.877+] WARN [Log 
partition=SecondTopic-3, dir=/apps/kafka/data] Found a corrupted index file 
corresponding to log file 
/apps/kafka/data/SecondTopic-3/0494.log due to Corrupt time 
index found, time index file 
(/apps/kafka/data/SecondTopic-3/0494.timeindex) has non-zero 
size but the last timestamp is 0 which is less than the first timestamp 
1553720469480}, recovering segment and rebuilding index files... (kafka.log.Log)
 
{code}


  was:
After Kafka version upgrade from 0.10.2.2 to 2.1.1 Warnings starts coming for 
all the topics "due to Corrupt time index found, time index file".

{code:java}
{{[2019-03-28 17:23:55.877+] WARN [Log partition=FirstTopic-6, 
dir=/apps/kafka/data] Found a corrupted index file corresponding to log file 
/apps/kafka/data/FirstTopic-6/0494.log due to Corrupt time 
index found, time index file 
(/apps/kafka/data/FirstTopic-6/0494.timeindex) has non-zero 
size but the last timestamp is 0 which is less than the first timestamp 
1553720469480}, recovering segment and rebuilding index files... 
(kafka.log.Log) }}
{{[2019-03-28 17:23:55.877+] WARN [Log partition=NewTopic-3, 
dir=/apps/kafka/data] Found a corrupted index file corresponding to log file 
/apps/kafka/data/NewTopic-3/0494.log due to Corrupt time index 
found, time index file 
(/apps/kafka/data/NewTopic-3/0494.timeindex) has non-zero size 
but the last timestamp is 0 which is less than the first timestamp 
1553720469480}, recovering segment and rebuilding index files... 
(kafka.log.Log) [2019-03-28 17:23:55.877+] WARN [Log 
partition=SecondTopic-3, dir=/apps/kafka/data] Found a corrupted index file 
corresponding to log file 
/apps/kafka/data/SecondTopic-3/0494.log due to Corrupt time 
index found, time index file 
(/apps/kafka/data/SecondTopic-3/0494.timeindex) has non-zero 
size but the last timestamp is 0 which is less than the first timestamp 
1553720469480}, recovering segment and rebuilding index files... 
(kafka.log.Log) }}
 
{code}



> Kafka Errors after version upgrade from 0.10.2.2 to 2.1.1 
> --
>
> Key: KAFKA-8173
> URL: https://issues.apache.org/jira/browse/KAFKA-8173
> Project: Kafka
>  Issue Type: Improvement
>  Components: offset manager
>Reporter: Amit Anand
>Priority: Major
>
> After Kafka version upgrade from 0.10.2.2 to 2.1.1 Warnings starts coming for 
> all the topics "due to Corrupt time index found, time index file".
> {code:java}
> [2019-03-28 17:23:55.877+] WARN [Log partition=FirstTopic-6, 
> dir=/apps/kafka/data] Found a corrupted index file corresponding to log file 
> /apps/kafka/data/FirstTopic-6/0494.log due to Corrupt time 
> index found, time index file 
> (/apps/kafka/data/FirstTopic-6/0494.timeindex) has non-zero 
> size but the last timestamp is 0 which is less than the first timestamp 
> 1553720469480}, recovering segment and rebuilding index files... 
> (kafka.log.Log) }}
> {{[2019-03-28 17:23:55.877+] WARN [Log partition=NewTopic-3, 
> dir=/apps/kafka/data] Found a corrupted index file corresponding to log file 
> /apps/kafka/data/NewTopic-3/0494.log due to Corrupt time 
> index found, time index file 
> (/apps/kafka/data/NewTopic-3/0494.timeindex) has non-zero 
> size but the last timestamp is 0 which is less than the first timestamp 
> 1553720469480}, recovering segment and rebuilding index files... 
> 

[jira] [Created] (KAFKA-8173) Kafka Errors after version upgrade from 0.10.2.2 to 2.1.1

2019-03-28 Thread Amit Anand (JIRA)
Amit Anand created KAFKA-8173:
-

 Summary: Kafka Errors after version upgrade from 0.10.2.2 to 2.1.1 
 Key: KAFKA-8173
 URL: https://issues.apache.org/jira/browse/KAFKA-8173
 Project: Kafka
  Issue Type: Improvement
  Components: offset manager
Reporter: Amit Anand


After Kafka version upgrade from 0.10.2.2 to 2.1.1 Warnings starts coming for 
all the topics "due to Corrupt time index found, time index file".

{code:java}
{{[2019-03-28 17:23:55.877+] WARN [Log partition=FirstTopic-6, 
dir=/apps/kafka/data] Found a corrupted index file corresponding to log file 
/apps/kafka/data/FirstTopic-6/0494.log due to Corrupt time 
index found, time index file 
(/apps/kafka/data/FirstTopic-6/0494.timeindex) has non-zero 
size but the last timestamp is 0 which is less than the first timestamp 
1553720469480}, recovering segment and rebuilding index files... 
(kafka.log.Log) }}
{{[2019-03-28 17:23:55.877+] WARN [Log partition=NewTopic-3, 
dir=/apps/kafka/data] Found a corrupted index file corresponding to log file 
/apps/kafka/data/NewTopic-3/0494.log due to Corrupt time index 
found, time index file 
(/apps/kafka/data/NewTopic-3/0494.timeindex) has non-zero size 
but the last timestamp is 0 which is less than the first timestamp 
1553720469480}, recovering segment and rebuilding index files... 
(kafka.log.Log) [2019-03-28 17:23:55.877+] WARN [Log 
partition=SecondTopic-3, dir=/apps/kafka/data] Found a corrupted index file 
corresponding to log file 
/apps/kafka/data/SecondTopic-3/0494.log due to Corrupt time 
index found, time index file 
(/apps/kafka/data/SecondTopic-3/0494.timeindex) has non-zero 
size but the last timestamp is 0 which is less than the first timestamp 
1553720469480}, recovering segment and rebuilding index files... 
(kafka.log.Log) }}
 
{code}




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7502) Cleanup KTable materialization logic in a single place

2019-03-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804297#comment-16804297
 ] 

ASF GitHub Bot commented on KAFKA-7502:
---

guozhangwang commented on pull request #6453: KAFKA-7502: Cleanup KTable 
materialization logic in a single place (filter)
URL: https://github.com/apache/kafka/pull/6453
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Cleanup KTable materialization logic in a single place
> --
>
> Key: KAFKA-7502
> URL: https://issues.apache.org/jira/browse/KAFKA-7502
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Lee Dongjin
>Priority: Major
>
> Today since we pre-create all the `KTableXXX` operator along with the logical 
> node, we are effectively duplicating the logic to determine whether the 
> resulted KTable should be materialized. More specifically, the 
> materialization principle today is that:
> 1) If users specified Materialized in the DSL and it contains a queryable 
> name. We always materialize.
> 2) If users specified Materialized in the DSL but not contains a queryable 
> name, or if users do not specify a Materialized object at all, Streams may 
> choose to materialize or not. But in any cases, even if the KTable is 
> materialized it will not be queryable since there's no queryable name (i.e. 
> only storeName is not null, but queryableName is null):
> 2.a) If the resulted KTable is from an aggregation, we always materialize 
> since it is needed for storing the aggregation (i.e. we use the 
> MaterializedInternal constructor with nameProvider != null).
> 2.b) If the resulted KTable is from a source topic, we delay the 
> materialization until the downstream operator requires this KTable to be 
> materialized or send-old-values (see `KTableSourceNode` and `KTableSource`).
> 2.c) If the resulted KTable if from a join, we always materialize if users 
> creates a Materialized object even without a queryable name. However this can 
> be optimized similar to 2.b) but is orthogonal to this ticket (see 
> `KTableImpl#buildJoin` where we always use constructor with nameProvider != 
> null).
> 2.d) If the resulted KTable is from a stateless operation like filter / 
> mapValues, we never materialize.
> 
> Now, in all of these cases, we have logical node like "KTableKTableJoinNode", 
> as well as physical node like `ProcessorNode`. Ideally we should always 
> create the logical Plan (i.e. the StreamsGraph), and then optimize it if 
> necessary, and then generate the physical plan (i.e. the Topology), however 
> today we create some physical nodes beforehand, and the above logic is hence 
> duplicated in the creation of both physical nodes and logical nodes. For 
> example, in `KTableKTableJoinNode` we check if Materialized is null for 
> adding a state store, and in `KTableImpl#doJoin` we check if materialized is 
> specified (case 2.c) above). 
> Another example is in TableProcessorNode which is used for 2.d) above, in 
> which it includes the logic whereas its caller, `KTableImpl#doFilter` for 
> example, also contains the logic when deciding to pass `queryableName` 
> parameter to `KTableProcessorSupplier`.
> This is bug-vulnerable since we may update the logic in one class but forgot 
> to update the other class.
> --
> What we want to have is a cleaner code path similar to what we have for 2.b), 
> such that when creating the logical nodes we keep track of whether 1) 
> materialized is specified, and 2) queryable name is provided. And during 
> optimization phase, we may change the inner physical ProcessorBuilder's 
> parameters like queryable name etc, and then when it is time to generate the 
> physical node, we can just blindly take the parameters and go for it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8172) FileSystemException: The process cannot access the file because it is being used by another process

2019-03-28 Thread Bharat Kondeti (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bharat Kondeti updated KAFKA-8172:
--
Description: 
Fix to close file handlers before renaming files / directories and open them 
back if required

Following are the file renaming scenarios:
 * Files are renamed to .deleted so they can be deleted
 * .cleaned files are renamed to .swap as part of Log.replaceSegments flow
 * .swap files are renamed to original files as part of Log.replaceSegments flow

Following are the folder renaming scenarios:
 * When a topic is marked for deletion, folder is renamed
 * As part of replacing current logs with future logs in LogManager

In above scenarios, if file handles are not closed, we get file access 
violation exception

Idea is to close the logs and file segments before doing a rename and open them 
back up if required.

*Segments Deletion Scenario*

[2018-06-01 17:00:07,566] ERROR Error while deleting segments for test4-1 in 
dir D:\data\Kafka\kafka-logs (kafka.server.LogDirFailureChannel)

java.nio.file.FileSystemException: 
D:\data\Kafka\kafka-logs\test4-1\.log -> 
D:\data\Kafka\kafka-logs\test4-1\.log.deleted: The process 
cannot access the file because it is being used by another process.

at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
 at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
 at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387)
 at 
sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
 at java.nio.file.Files.move(Files.java:1395)
 at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697)
 at org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212)
 at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415)
 at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:1601)
 at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:1588)
 at 
kafka.log.Log$$anonfun$deleteSegments$1$$anonfun$apply$mcI$sp$1.apply(Log.scala:1170)
 at 
kafka.log.Log$$anonfun$deleteSegments$1$$anonfun$apply$mcI$sp$1.apply(Log.scala:1170)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at kafka.log.Log$$anonfun$deleteSegments$1.apply$mcI$sp(Log.scala:1170)
 at kafka.log.Log$$anonfun$deleteSegments$1.apply(Log.scala:1161)
 at kafka.log.Log$$anonfun$deleteSegments$1.apply(Log.scala:1161)
 at kafka.log.Log.maybeHandleIOException(Log.scala:1678)
 at kafka.log.Log.deleteSegments(Log.scala:1161)
 at kafka.log.Log.deleteOldSegments(Log.scala:1156)
 at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1228)
 at kafka.log.Log.deleteOldSegments(Log.scala:1222)
 at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:854)
 at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:852)
 at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
 at scala.collection.immutable.List.foreach(List.scala:392)
 at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
 at kafka.log.LogManager.cleanupLogs(LogManager.scala:852)
 at kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:385)
 at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
 at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
 Suppressed: java.nio.file.FileSystemException: 
D:\data\Kafka\kafka-logs\test4-1\.log -> 
D:\data\Kafka\kafka-logs\test4-1\.log.deleted: The process 
cannot access the file because it is being used by another process.

at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
 at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
 at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:301)
 at 
sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
 at java.nio.file.Files.move(Files.java:1395)
 at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:694)
 ... 32 more

 

*Topic deletion scenario*

2018-06-07 15:05:17,805] ERROR Error while renaming dir for test5-1 in log dir 
D:\data\Kafka\kafka-logs 

[jira] [Commented] (KAFKA-8172) FileSystemException: The process cannot access the file because it is being used by another process

2019-03-28 Thread Bharat Kondeti (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804243#comment-16804243
 ] 

Bharat Kondeti commented on KAFKA-8172:
---

Pull request link

> FileSystemException: The process cannot access the file because it is being 
> used by another process
> ---
>
> Key: KAFKA-8172
> URL: https://issues.apache.org/jira/browse/KAFKA-8172
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.1, 2.2.0, 2.1.1
> Environment: Windows
>Reporter: Bharat Kondeti
>Priority: Major
> Fix For: 1.1.1, 2.2.0, 2.1.1
>
> Attachments: 
> 0001-Fix-to-close-the-handlers-before-renaming-files-and-.patch
>
>
> Fix to close file handlers before renaming files / directories and open them 
> back if required
> Following are the file renaming scenarios:
>  * Files are renamed to .deleted so they can be deleted
>  * .cleaned files are renamed to .swap as part of Log.replaceSegments flow
>  * .swap files are renamed to original files as part of Log.replaceSegments 
> flow
> Following are the folder renaming scenarios:
>  * When a topic is marked for deletion, folder is renamed
>  * As part of replacing current logs with future logs in LogManager
> In above scenarios, if file handles are not closed, we get file access 
> violation exception
> Idea is to close the logs and file segments before doing a rename and open 
> them back up if required.
> *Segments Deletion Scenario*
> [2018-06-01 17:00:07,566] ERROR Error while deleting segments for test4-1 in 
> dir D:\data\Kafka\kafka-logs (kafka.server.LogDirFailureChannel)
>  java.nio.file.FileSystemException: 
> D:\data\Kafka\kafka-logs\test4-1\.log -> 
> D:\data\Kafka\kafka-logs\test4-1\.log.deleted: The 
> process cannot access the file because it is being used by another process.
> at 
> sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
>  at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
>  at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387)
>  at 
> sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
>  at java.nio.file.Files.move(Files.java:1395)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697)
>  at org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212)
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:1601)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:1588)
>  at 
> kafka.log.Log$$anonfun$deleteSegments$1$$anonfun$apply$mcI$sp$1.apply(Log.scala:1170)
>  at 
> kafka.log.Log$$anonfun$deleteSegments$1$$anonfun$apply$mcI$sp$1.apply(Log.scala:1170)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at kafka.log.Log$$anonfun$deleteSegments$1.apply$mcI$sp(Log.scala:1170)
>  at kafka.log.Log$$anonfun$deleteSegments$1.apply(Log.scala:1161)
>  at kafka.log.Log$$anonfun$deleteSegments$1.apply(Log.scala:1161)
>  at kafka.log.Log.maybeHandleIOException(Log.scala:1678)
>  at kafka.log.Log.deleteSegments(Log.scala:1161)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1156)
>  at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1228)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1222)
>  at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:854)
>  at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:852)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>  at scala.collection.immutable.List.foreach(List.scala:392)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:852)
>  at kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:385)
>  at 
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at 

[jira] [Commented] (KAFKA-8172) FileSystemException: The process cannot access the file because it is being used by another process

2019-03-28 Thread Bharat Kondeti (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804235#comment-16804235
 ] 

Bharat Kondeti commented on KAFKA-8172:
---

Pull request 

> FileSystemException: The process cannot access the file because it is being 
> used by another process
> ---
>
> Key: KAFKA-8172
> URL: https://issues.apache.org/jira/browse/KAFKA-8172
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.1, 2.2.0, 2.1.1
> Environment: Windows
>Reporter: Bharat Kondeti
>Priority: Major
> Fix For: 1.1.1, 2.2.0, 2.1.1
>
>
> Fix to close file handlers before renaming files / directories and open them 
> back if required
> Following are the file renaming scenarios:
>  * Files are renamed to .deleted so they can be deleted
>  * .cleaned files are renamed to .swap as part of Log.replaceSegments flow
>  * .swap files are renamed to original files as part of Log.replaceSegments 
> flow
> Following are the folder renaming scenarios:
>  * When a topic is marked for deletion, folder is renamed
>  * As part of replacing current logs with future logs in LogManager
> In above scenarios, if file handles are not closed, we get file access 
> violation exception
> Idea is to close the logs and file segments before doing a rename and open 
> them back up if required.
> *Segments Deletion Scenario*
> [2018-06-01 17:00:07,566] ERROR Error while deleting segments for test4-1 in 
> dir D:\data\Kafka\kafka-logs (kafka.server.LogDirFailureChannel)
>  java.nio.file.FileSystemException: 
> D:\data\Kafka\kafka-logs\test4-1\.log -> 
> D:\data\Kafka\kafka-logs\test4-1\.log.deleted: The 
> process cannot access the file because it is being used by another process.
> at 
> sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
>  at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
>  at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387)
>  at 
> sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
>  at java.nio.file.Files.move(Files.java:1395)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697)
>  at org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212)
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:1601)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:1588)
>  at 
> kafka.log.Log$$anonfun$deleteSegments$1$$anonfun$apply$mcI$sp$1.apply(Log.scala:1170)
>  at 
> kafka.log.Log$$anonfun$deleteSegments$1$$anonfun$apply$mcI$sp$1.apply(Log.scala:1170)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at kafka.log.Log$$anonfun$deleteSegments$1.apply$mcI$sp(Log.scala:1170)
>  at kafka.log.Log$$anonfun$deleteSegments$1.apply(Log.scala:1161)
>  at kafka.log.Log$$anonfun$deleteSegments$1.apply(Log.scala:1161)
>  at kafka.log.Log.maybeHandleIOException(Log.scala:1678)
>  at kafka.log.Log.deleteSegments(Log.scala:1161)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1156)
>  at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1228)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1222)
>  at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:854)
>  at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:852)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>  at scala.collection.immutable.List.foreach(List.scala:392)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:852)
>  at kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:385)
>  at 
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
>  Suppressed: java.nio.file.FileSystemException: 
> 

[jira] [Created] (KAFKA-8172) FileSystemException: The process cannot access the file because it is being used by another process

2019-03-28 Thread Bharat Kondeti (JIRA)
Bharat Kondeti created KAFKA-8172:
-

 Summary: FileSystemException: The process cannot access the file 
because it is being used by another process
 Key: KAFKA-8172
 URL: https://issues.apache.org/jira/browse/KAFKA-8172
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.1.1, 2.2.0, 1.1.1
 Environment: Windows
Reporter: Bharat Kondeti
 Fix For: 2.1.1, 2.2.0, 1.1.1


Fix to close file handlers before renaming files / directories and open them 
back if required

Following are the file renaming scenarios:
 * Files are renamed to .deleted so they can be deleted
 * .cleaned files are renamed to .swap as part of Log.replaceSegments flow
 * .swap files are renamed to original files as part of Log.replaceSegments flow

Following are the folder renaming scenarios:
 * When a topic is marked for deletion, folder is renamed
 * As part of replacing current logs with future logs in LogManager

In above scenarios, if file handles are not closed, we get file access 
violation exception

Idea is to close the logs and file segments before doing a rename and open them 
back up if required.

*Segments Deletion Scenario*

[2018-06-01 17:00:07,566] ERROR Error while deleting segments for test4-1 in 
dir D:\data\Kafka\kafka-logs (kafka.server.LogDirFailureChannel)
 java.nio.file.FileSystemException: 
D:\data\Kafka\kafka-logs\test4-1\.log -> 
D:\data\Kafka\kafka-logs\test4-1\.log.deleted: The process 
cannot access the file because it is being used by another process.

at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
 at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
 at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387)
 at 
sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
 at java.nio.file.Files.move(Files.java:1395)
 at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697)
 at org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212)
 at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415)
 at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:1601)
 at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:1588)
 at 
kafka.log.Log$$anonfun$deleteSegments$1$$anonfun$apply$mcI$sp$1.apply(Log.scala:1170)
 at 
kafka.log.Log$$anonfun$deleteSegments$1$$anonfun$apply$mcI$sp$1.apply(Log.scala:1170)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
 at kafka.log.Log$$anonfun$deleteSegments$1.apply$mcI$sp(Log.scala:1170)
 at kafka.log.Log$$anonfun$deleteSegments$1.apply(Log.scala:1161)
 at kafka.log.Log$$anonfun$deleteSegments$1.apply(Log.scala:1161)
 at kafka.log.Log.maybeHandleIOException(Log.scala:1678)
 at kafka.log.Log.deleteSegments(Log.scala:1161)
 at kafka.log.Log.deleteOldSegments(Log.scala:1156)
 at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1228)
 at kafka.log.Log.deleteOldSegments(Log.scala:1222)
 at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:854)
 at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:852)
 at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
 at scala.collection.immutable.List.foreach(List.scala:392)
 at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
 at kafka.log.LogManager.cleanupLogs(LogManager.scala:852)
 at kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:385)
 at kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
 at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
 Suppressed: java.nio.file.FileSystemException: 
D:\data\Kafka\kafka-logs\test4-1\.log -> 
D:\data\Kafka\kafka-logs\test4-1\.log.deleted: The process 
cannot access the file because it is being used by another process.

at sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
 at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
 at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:301)
 at 

[jira] [Commented] (KAFKA-6983) Error while deleting segments - The process cannot access the file because it is being used by another process

2019-03-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804224#comment-16804224
 ] 

ASF GitHub Bot commented on KAFKA-6983:
---

kondetibharat commented on pull request #6517: KAFKA-6983: Fix to close file 
handlers before renaming files/directories
URL: https://github.com/apache/kafka/pull/6517
 
 
   Fix to close file handlers before renaming files / directories and open them 
back if required
   
   Following are the file renaming scenarios:
   - Files are renamed to .deleted so they can be deleted
   - .cleaned files are renamed to .swap as part of Log.replaceSegments flow
   - .swap files are renamed to original files as part of Log.replaceSegments 
flow
   
   Following are the folder renaming scenarios:
   - When a topic is marked for deletion, folder is renamed
   - As part of replacing current logs with future logs in LogManager
   
   In above scenarios, if file handles are not closed, we get file access 
violation exception
   
   Idea is to close the logs and file segments before doing a rename and open 
them back up if required.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Error while deleting segments - The process cannot access the file because it 
> is being used by another process
> --
>
> Key: KAFKA-6983
> URL: https://issues.apache.org/jira/browse/KAFKA-6983
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 1.1.0
> Environment: Windows 10
>Reporter: wade wu
>Priority: Major
>
> ..
> [2018-06-01 17:00:07,566] ERROR Error while deleting segments for test4-1 in 
> dir D:\data\Kafka\kafka-logs (kafka.server.LogDirFailureChannel)
> java.nio.file.FileSystemException: 
> D:\data\Kafka\kafka-logs\test4-1\.log -> 
> D:\data\Kafka\kafka-logs\test4-1\.log.deleted: The 
> process cannot access the file because it is being used by another process.
> at 
> sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
>  at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
>  at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387)
>  at 
> sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
>  at java.nio.file.Files.move(Files.java:1395)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697)
>  at org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212)
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:1601)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:1588)
>  at 
> kafka.log.Log$$anonfun$deleteSegments$1$$anonfun$apply$mcI$sp$1.apply(Log.scala:1170)
>  at 
> kafka.log.Log$$anonfun$deleteSegments$1$$anonfun$apply$mcI$sp$1.apply(Log.scala:1170)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at kafka.log.Log$$anonfun$deleteSegments$1.apply$mcI$sp(Log.scala:1170)
>  at kafka.log.Log$$anonfun$deleteSegments$1.apply(Log.scala:1161)
>  at kafka.log.Log$$anonfun$deleteSegments$1.apply(Log.scala:1161)
>  at kafka.log.Log.maybeHandleIOException(Log.scala:1678)
>  at kafka.log.Log.deleteSegments(Log.scala:1161)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1156)
>  at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1228)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1222)
>  at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:854)
>  at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:852)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>  at scala.collection.immutable.List.foreach(List.scala:392)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:852)
>  at kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:385)
>  at 
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)
>  at 

[jira] [Commented] (KAFKA-7895) Ktable supress operator emitting more than one record for the same key per window

2019-03-28 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804179#comment-16804179
 ] 

John Roesler commented on KAFKA-7895:
-

Here's the repro I put together. It'll take me a little while to debug it, but 
I wanted to share my approach.

[https://github.com/vvcephei/suppress-demo]

-John

> Ktable supress operator emitting more than one record for the same key per 
> window
> -
>
> Key: KAFKA-7895
> URL: https://issues.apache.org/jira/browse/KAFKA-7895
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.1.1
>Reporter: prasanthi
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.2.0, 2.1.2
>
>
> Hi, We are using kstreams to get the aggregated counts per vendor(key) within 
> a specified window.
> Here's how we configured the suppress operator to emit one final record per 
> key/window.
> {code:java}
> KTable, Long> windowedCount = groupedStream
>  .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L)))
>  .count(Materialized.with(Serdes.Integer(),Serdes.Long()))
>  .suppress(Suppressed.untilWindowCloses(unbounded()));
> {code}
> But we are getting more than one record for the same key/window as shown 
> below.
> {code:java}
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162
> [KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584
> [KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107
> [KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315
> [KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code}
> Could you please take a look?
> Thanks
>  
>  
> Added by John:
> Acceptance Criteria:
>  * add suppress to system tests, such that it's exercised with crash/shutdown 
> recovery, rebalance, etc.
>  ** [https://github.com/apache/kafka/pull/6278]
>  * make sure that there's some system test coverage with caching disabled.
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943
>  * test with tighter time bounds with windows of say 30 seconds and use 
> system time without adding any extra time for verification
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7895) Ktable supress operator emitting more than one record for the same key per window

2019-03-28 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804035#comment-16804035
 ] 

John Roesler edited comment on KAFKA-7895 at 3/28/19 6:01 PM:
--

EDIT: actually, nevermind. After replicating your application, I was able to 
reproduce what you're seeing. I'll let you know when I figure out what's going 
on.

 

Thanks!

Also, can you elaborate on how `TimestampedValueTimestampExtractor` behaves?

Thanks,

-John


was (Author: vvcephei):
Thanks!

Also, can you elaborate on how `TimestampedValueTimestampExtractor` behaves?

Thanks,

-John

> Ktable supress operator emitting more than one record for the same key per 
> window
> -
>
> Key: KAFKA-7895
> URL: https://issues.apache.org/jira/browse/KAFKA-7895
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.1.1
>Reporter: prasanthi
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.2.0, 2.1.2
>
>
> Hi, We are using kstreams to get the aggregated counts per vendor(key) within 
> a specified window.
> Here's how we configured the suppress operator to emit one final record per 
> key/window.
> {code:java}
> KTable, Long> windowedCount = groupedStream
>  .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L)))
>  .count(Materialized.with(Serdes.Integer(),Serdes.Long()))
>  .suppress(Suppressed.untilWindowCloses(unbounded()));
> {code}
> But we are getting more than one record for the same key/window as shown 
> below.
> {code:java}
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162
> [KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584
> [KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107
> [KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315
> [KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code}
> Could you please take a look?
> Thanks
>  
>  
> Added by John:
> Acceptance Criteria:
>  * add suppress to system tests, such that it's exercised with crash/shutdown 
> recovery, rebalance, etc.
>  ** [https://github.com/apache/kafka/pull/6278]
>  * make sure that there's some system test coverage with caching disabled.
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943
>  * test with tighter time bounds with windows of say 30 seconds and use 
> system time without adding any extra time for verification
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2019-03-28 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804148#comment-16804148
 ] 

Guozhang Wang commented on KAFKA-7965:
--

[~enether] Saw another failure: 
https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/3484

```
kafka.api.ConsumerBounceTest > 
testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup FAILED
08:14:24 java.util.concurrent.ExecutionException: Boxed Error
08:14:24 
08:14:24 Caused by:
08:14:24 java.lang.AssertionError: Received 0, expected at least 68
08:14:24 at org.junit.Assert.fail(Assert.java:89)
08:14:24 at org.junit.Assert.assertTrue(Assert.java:42)
08:14:24 at 
kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:563)
08:14:24 at 
kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$5(ConsumerBounceTest.scala:347)
```

> Flaky Test 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> 
>
> Key: KAFKA-7965
> URL: https://issues.apache.org/jira/browse/KAFKA-7965
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 2.2.0, 2.3.0
>Reporter: Matthias J. Sax
>Assignee: Stanislav Kozlovski
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0, 2.2.1
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/]
> {quote}java.lang.AssertionError: Received 0, expected at least 68 at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.assertTrue(Assert.java:41) at 
> kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) 
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320)
>  at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8171) callback needs to be null when addStopReplicaRequestForBrokers when replica state transits to offline

2019-03-28 Thread kehu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8171?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kehu updated KAFKA-8171:

Description: 
Problem: 

In ControllerChannelManager.sendRequestsToBrokers, for STOP_REPLICA requests, 
it will try to group the requests based on deletePartition flag and callback:

val (replicasToGroup, replicasToNotGroup) = replicaInfoList.partition(r => 
!r.deletePartition && r.callback == null)

When both conditions meet, controller is expected to send only one request to 
destination broker. However, when adding the requests in ReplicaStateMachine, 
it's putting in non-null callback (_,_)=>(). Therefore, replicasToGroup is 
always empty and controller will always first sends an empty request followed 
by #partitions requests.

 

Fix: set the callback to null in addStopReplicaRequestForBrokers when replica 
state transits to offline. PR has been created: 
https://github.com/apache/kafka/pull/6515

  was:
Problem: 

In ControllerChannelManager.sendRequestsToBrokers, for STOP_REPLICA requests, 
it will try to group the requests based on deletePartition flag and callback:

val (replicasToGroup, replicasToNotGroup) = replicaInfoList.partition(r => 
!r.deletePartition && r.callback == null)

When both conditions meet, controller is expected to send only one request to 
destination broker. However, when adding the requests in ReplicaStateMachine, 
it's putting in non-null callback (_,_)=>(). Therefore, replicasToGroup is 
always empty and controller will always first sends an empty request followed 
by #partitions requests.

 

Fix: set the callback to null in addStopReplicaRequestForBrokers when replica 
state transits to offline.


> callback needs to be null when addStopReplicaRequestForBrokers when replica 
> state transits to offline
> -
>
> Key: KAFKA-8171
> URL: https://issues.apache.org/jira/browse/KAFKA-8171
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Reporter: kehu
>Priority: Major
>
> Problem: 
> In ControllerChannelManager.sendRequestsToBrokers, for STOP_REPLICA requests, 
> it will try to group the requests based on deletePartition flag and callback:
> val (replicasToGroup, replicasToNotGroup) = replicaInfoList.partition(r => 
> !r.deletePartition && r.callback == null)
> When both conditions meet, controller is expected to send only one request to 
> destination broker. However, when adding the requests in ReplicaStateMachine, 
> it's putting in non-null callback (_,_)=>(). Therefore, replicasToGroup is 
> always empty and controller will always first sends an empty request followed 
> by #partitions requests.
>  
> Fix: set the callback to null in addStopReplicaRequestForBrokers when replica 
> state transits to offline. PR has been created: 
> https://github.com/apache/kafka/pull/6515



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8171) callback needs to be null when addStopReplicaRequestForBrokers when replica state transits to offline

2019-03-28 Thread kehu (JIRA)
kehu created KAFKA-8171:
---

 Summary: callback needs to be null when 
addStopReplicaRequestForBrokers when replica state transits to offline
 Key: KAFKA-8171
 URL: https://issues.apache.org/jira/browse/KAFKA-8171
 Project: Kafka
  Issue Type: Bug
  Components: controller
Reporter: kehu


Problem: 

In ControllerChannelManager.sendRequestsToBrokers, for STOP_REPLICA requests, 
it will try to group the requests based on deletePartition flag and callback:

val (replicasToGroup, replicasToNotGroup) = replicaInfoList.partition(r => 
!r.deletePartition && r.callback == null)

When both conditions meet, controller is expected to send only one request to 
destination broker. However, when adding the requests in ReplicaStateMachine, 
it's putting in non-null callback (_,_)=>(). Therefore, replicasToGroup is 
always empty and controller will always first sends an empty request followed 
by #partitions requests.

 

Fix: set the callback to null in addStopReplicaRequestForBrokers when replica 
state transits to offline.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7895) Ktable supress operator emitting more than one record for the same key per window

2019-03-28 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804035#comment-16804035
 ] 

John Roesler commented on KAFKA-7895:
-

Thanks!

Also, can you elaborate on how `TimestampedValueTimestampExtractor` behaves?

Thanks,

-John

> Ktable supress operator emitting more than one record for the same key per 
> window
> -
>
> Key: KAFKA-7895
> URL: https://issues.apache.org/jira/browse/KAFKA-7895
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.1.1
>Reporter: prasanthi
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.2.0, 2.1.2
>
>
> Hi, We are using kstreams to get the aggregated counts per vendor(key) within 
> a specified window.
> Here's how we configured the suppress operator to emit one final record per 
> key/window.
> {code:java}
> KTable, Long> windowedCount = groupedStream
>  .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L)))
>  .count(Materialized.with(Serdes.Integer(),Serdes.Long()))
>  .suppress(Suppressed.untilWindowCloses(unbounded()));
> {code}
> But we are getting more than one record for the same key/window as shown 
> below.
> {code:java}
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162
> [KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584
> [KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107
> [KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315
> [KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code}
> Could you please take a look?
> Thanks
>  
>  
> Added by John:
> Acceptance Criteria:
>  * add suppress to system tests, such that it's exercised with crash/shutdown 
> recovery, rebalance, etc.
>  ** [https://github.com/apache/kafka/pull/6278]
>  * make sure that there's some system test coverage with caching disabled.
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943
>  * test with tighter time bounds with windows of say 30 seconds and use 
> system time without adding any extra time for verification
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7981) Add Replica Fetcher and Log Cleaner Count Metrics

2019-03-28 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7981?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804012#comment-16804012
 ] 

ASF GitHub Bot commented on KAFKA-7981:
---

viktorsomogyi commented on pull request #6514: KAFKA-7981: Add fetcher and log 
cleaner thread count metrics
URL: https://github.com/apache/kafka/pull/6514
 
 
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Replica Fetcher and Log Cleaner Count Metrics
> -
>
> Key: KAFKA-7981
> URL: https://issues.apache.org/jira/browse/KAFKA-7981
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Affects Versions: 2.3.0
>Reporter: Viktor Somogyi-Vass
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>  Labels: kip
>
> In some occasions we detected errors where replica fetcher threads or log 
> cleaners died because of an unrecoverable error and caused more serious 
> issues in the brokers (from lagging to offline replicas, filling up disks, 
> etc.). It would often help if the monitoring systems attached to Kafka could 
> detect these problems early on as it would allow a prompt response from the 
> user and the greater possibility of capturing the root cause.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8106) Remove unnecessary decompression operation when logValidator do validation.

2019-03-28 Thread hackerwin7 (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16804003#comment-16804003
 ] 

hackerwin7 commented on KAFKA-8106:
---

The `validateRecord()` is placed in every branch of 
`validateMessagesAndAssignOffsets()`,
Any ideas to put it somewhere to prevent the broker decompression ?

> Remove unnecessary decompression operation when logValidator  do validation.
> 
>
> Key: KAFKA-8106
> URL: https://issues.apache.org/jira/browse/KAFKA-8106
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
> Environment: Server : 
> cpu:2*16 ; 
> MemTotal : 256G;
> Ethernet controller:Intel Corporation 82599ES 10-Gigabit SFI/SFP+ Network 
> Connection ; 
> SSD.
>Reporter: Flower.min
>Assignee: Flower.min
>Priority: Major
>  Labels: performance
>
>       We do performance testing about Kafka in specific scenarios as 
> described below .We build a kafka cluster with one broker,and create topics 
> with different number of partitions.Then we start lots of producer processes 
> to send large amounts of messages to one of the topics at one  testing .
> *_Specific Scenario_*
>   
>  *_1.Main config of Kafka_*  
>  # Main config of Kafka  
> server:[num.network.threads=6;num.io.threads=128;queued.max.requests=500|http://num.network.threads%3D6%3Bnum.io.threads%3D128%3Bqueued.max.requests%3D500/]
>  # Number of TopicPartition : 50~2000
>  # Size of Single Message : 1024B
>  
>  *_2.Config of KafkaProducer_* 
> ||compression.type||[linger.ms|http://linger.ms/]||batch.size||buffer.memory||
> |lz4|1000ms~5000ms|16KB/10KB/100KB|128MB|
> *_3.The best result of performance testing_*  
> ||Network inflow rate||CPU Used (%)||Disk write speed||Performance of 
> production||
> |550MB/s~610MB/s|97%~99%|:550MB/s~610MB/s       |23,000,000 messages/s|
> *_4.Phenomenon and  my doubt_*
>     _The upper limit of CPU usage has been reached  But  it does not 
> reach the upper limit of the bandwidth of the server  network. *We are 
> doubtful about which  cost too much CPU time and we want to Improve  
> performance and reduces CPU usage of Kafka server.*_
>   
>  _*5.Analysis*_
>         We analysis the JFIR of Kafka server when doing performance testing 
> .We found the hot spot method is 
> *_"java.io.DataInputStream.readFully(byte[],int,int)"_* and 
> *_"org.apache.kafka.common.record.KafkaLZ4BlockInputStream.read(byte[],int,int)"_*.When
>   we checking thread stack information we  also have found most CPU being 
> occupied by lots of thread  which  is busy decompressing messages.Then we 
> read source code of Kafka .
>        There is double-layer nested Iterator  when LogValidator do validate 
> every record.And There is a decompression for each message when traversing 
> every RecordBatch iterator. It is consuming CPU and affect total performance 
> that  decompress message._*The purpose of decompressing every messages just 
> for gain total size in bytes of one record and size in bytes of record body 
> when magic value to use is above 1 and no format conversion or value 
> overwriting is required for compressed messages.It is negative for 
> performance in common usage scenarios .*_{color:#33}Therefore, we suggest 
> that *_removing unnecessary decompression operation_* when doing  validation 
> for compressed message  when magic value to use is above 1 and no format 
> conversion or value overwriting is required for compressed messages.{color}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8103) Kafka SIGSEGV on kafka-network-thread

2019-03-28 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16803955#comment-16803955
 ] 

Ismael Juma commented on KAFKA-8103:


Looks like a JVM JIT bug. I haven't seen this one before, so I wonder what's 
special about your environment.

> Kafka SIGSEGV on kafka-network-thread
> -
>
> Key: KAFKA-8103
> URL: https://issues.apache.org/jira/browse/KAFKA-8103
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.1
> Environment: OS 
> Amazon Linux
> Kernel 
> 4.14.97-74.72.amzn1.x86_64 #1 SMP Tue Feb 5 20:59:30 UTC 2019 x86_64 x86_64 
> x86_64 GNU/Linux
> Java
> openjdk version "1.8.0_191"
> OpenJDK Runtime Environment (build 1.8.0_191-b12)
> OpenJDK 64-Bit Server VM (build 25.191-b12, mixed mode)
> AWS Instance Type
> c5.4xlarge
>Reporter: Sean Humbarger
>Priority: Major
> Attachments: hs_err_pid4345.log
>
>
> We have a 4 node cluster (6 topics, 6 consumer groups) that is processing 
> 65,000 messages per second and are seeing SIGSEGV crashes at least once a day 
> (see attachment).  Each broker has six disks attached to it to support the 
> kafka logs.  When the crash occurs, we simply restart kafka and everything 
> seems fine.  We don't see anything out of the ordinary in /var/log/messages 
> or dmesg when the crashes occur.  Thus far, we are unable to predict during 
> the day when the crash will occur or which node it will occur on. 
>  
> The problematic frame is as follows:
> {code:java}
> # Problematic frame:
> # J 8628 C2 
> org.apache.kafka.common.metrics.stats.Max.update(Lorg/apache/kafka/common/metrics/stats/SampledStat$Sample;Lorg/apache/kafka/common/metrics/MetricConfig;DJ)V
>  (13 bytes) @ 0x7ff779f9fca0 [0x7ff779f9fc80+0x20]
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8170) To add kafka data at rest encryption

2019-03-28 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-8170?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16803940#comment-16803940
 ] 

Sönke Liebau commented on KAFKA-8170:
-

Hi [~ashelke], 

I've proposed 
[KIP-317|https://cwiki.apache.org/confluence/display/KAFKA/KIP-317%3A+Add+transparent+data+encryption+functionality]
 a while ago, which would probably cover this - though it would do so 
client-side.

It has been dormant for a while now due to lots of other things to do on my end 
to be honest, but I plan on reviving this very shortly.

 

 

> To add kafka data at rest encryption
> 
>
> Key: KAFKA-8170
> URL: https://issues.apache.org/jira/browse/KAFKA-8170
> Project: Kafka
>  Issue Type: New Feature
>  Components: log
>Reporter: Akash
>Priority: Minor
>  Labels: features, security
>
> Kafka have mechanism for wire encryption of data.
> But the kafka data at rest which exist in /- 
> is still unencrypted.
> This directories now have log files with actual messages embedded metadata, 
> but unauthorised user can still recover messages from this files
> Addiding encryption for this data would be valuable for preventing message 
> protection from disk theft, unauthorised user access on servers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8103) Kafka SIGSEGV on kafka-network-thread

2019-03-28 Thread Sean Humbarger (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8103?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16803933#comment-16803933
 ] 

Sean Humbarger commented on KAFKA-8103:
---

We are still seeing random JVM crashes.  We've switched over from OpenJDK to 
Oracle 1.8.202 and see the same thing:

 

{code}

#
# A fatal error has been detected by the Java Runtime Environment:
#
#  SIGSEGV (0xb) at pc=0x7f6c9cd85100, pid=4550, tid=0x7f6a64792700
#
# JRE version: Java(TM) SE Runtime Environment (8.0_202-b08) (build 
1.8.0_202-b08)
# Java VM: Java HotSpot(TM) 64-Bit Server VM (25.202-b08 mixed mode linux-amd64 
compressed oops)
# Problematic frame:
# V  [libjvm.so+0x2c7100]  Handle::Handle(Thread*, oopDesc*)+0x0
#
# Failed to write core dump. Core dumps have been disabled. To enable core 
dumping, try "ulimit -c unlimited" before starting Java again
#
# If you would like to submit a bug report, please visit:
#   http://bugreport.java.com/bugreport/crash.jsp
#
---  T H R E A D  ---
Current thread (0x7f6c99279000):  JavaThread "kafka-request-handler-3" 
daemon [_thread_in_vm, id=4984, stack(0x7f6a64692000,0x7f6a64793000)]
siginfo: si_signo: 11 (SIGSEGV), si_code: 2 (SEGV_ACCERR), si_addr: 
0x7f6c9cd85100
Registers:
RAX=0x7f6c9da7b2ce, RBX=0x7f6c99279000, RCX=0x0005502d3250, 
RDX=0x0005502d3250
RSP=0x7f6a647913d8, RBP=0x7f6a64791410, RSI=0x7f6c99279000, 
RDI=0x7f6a647913e8
R8 =0xaa05a64a, R9 =0x000550a3d9b8, R10=0x7f6c9d488af0, 
R11=0x0002
R12=0x7f6a64791470, R13=0x0005502d3250, R14=0x7f6c9da85f8c, 
R15=0x7f6c99279000
RIP=0x7f6c9cd85100, EFLAGS=0x00010246, CSGSFS=0x002b0033, 
ERR=0x0015
  TRAPNO=0x000e
Top of Stack: (sp=0x7f6a647913d8)
0x7f6a647913d8:   7f6c9d488b38 00700070
0x7f6a647913e8:   7f6c8a4e702c 7f67d003adaa
0x7f6a647913f8:    f2e95d57
0x7f6a64791408:   a9e2f767 a9e05757
0x7f6a64791418:   7f6c88e78c88 a9e05757
0x7f6a64791428:   7f6c8a0d149c 0005aa05a64a
0x7f6a64791438:   0005502d3250 0007974aeab8
0x7f6a64791448:   00054f02bab8 00054f17bb60
0x7f6a64791458:   7f6c9dbbacdd 0007974adfe0
0x7f6a64791468:   7f6c9d3cc53f 0003
0x7f6a64791478:   0d70aa32 00054f037260
0x7f6a64791488:   7f6c8ab3cc88 aa147bbef4590578
0x7f6a64791498:   0007a2c82bc0 0007974ae068
0x7f6a647914a8:   0007974adcb0 0007974adfe0
0x7f6a647914b8:   000550a3ddf0 0007974adcf8
0x7f6a647914c8:   0007974adc48 0007a2c83420
0x7f6a647914d8:   7f6cf2e95b69 a9e269d8
0x7f6a647914e8:   7f6c8a2c0934 0007974adba0
0x7f6a647914f8:   7f6c8a000138 7f6a64791550
0x7f6a64791508:   7ffed59c2c60 7f6a64791580
0x7f6a64791518:   0002f4590578 00079d149f38
0x7f6a64791528:   000550a3ddf0 7f6a64791570
0x7f6a64791538:    f4590684
0x7f6a64791548:   00079d1b3490 7f6a64791590
0x7f6a64791558:   7f6c9dbbacdd 0007f3a366fd
0x7f6a64791568:   7f6c9dbbacdd 0007974adb60
0x7f6a64791578:   7f6c9d3cc53f 00011172
0x7f6a64791588:   0d7091ab f4590567
0x7f6a64791598:   7f6c8a887a24 0007a2c82bc0
0x7f6a647915a8:   0007a2c832d0 aa147bbe15bd
0x7f6a647915b8:   f2e95b56974adb48 0005f2e95b67
0x7f6a647915c8:   0007974adb38 0007974adab0
Instructions: (pc=0x7f6c9cd85100)
0x7f6c9cd850e0:   e8 0b 4c 77 00 48 83 c4 30 5b 41 5c 5d c3 66 90
0x7f6c9cd850f0:   f3 c3 66 2e 0f 1f 84 00 00 00 00 00 0f 1f 40 00
0x7f6c9cd85100:   48 85 d2 74 63 55 48 89 e5 41 55 41 54 53 49 89
0x7f6c9cd85110:   fc 48 89 d3 48 83 ec 08 4c 8b ae 38 01 00 00 49
Register to memory mapping:
RAX=0x7f6c9da7b2ce:  in 
/usr/local/java/jdk1.8.0_202/jre/lib/amd64/server/libjvm.so at 
0x7f6c9cabe000
RBX=0x7f6c99279000 is a thread
RCX=0x0005502d3250 is an oop
java.lang.Object
 - klass: 'java/lang/Object'
RDX=0x0005502d3250 is an oop
java.lang.Object
 - klass: 'java/lang/Object'
RSP=0x7f6a647913d8 is pointing into the stack for thread: 0x7f6c99279000
RBP=0x7f6a64791410 is pointing into the stack for thread: 0x7f6c99279000
RSI=0x7f6c99279000 is a thread
RDI=0x7f6a647913e8 is pointing into the stack for thread: 0x7f6c99279000
R8 =0xaa05a64a is an unknown value
R9 =0x000550a3d9b8 is an oop
org.apache.kafka.common.utils.KafkaThread
 - klass: 'org/apache/kafka/common/utils/KafkaThread'
R10=0x7f6c9d488af0:  in 
/usr/local/java/jdk1.8.0_202/jre/lib/amd64/server/libjvm.so at 
0x7f6c9cabe000
R11=0x0002 is an unknown value
R12=0x7f6a64791470 is pointing into the stack for thread: 

[jira] [Created] (KAFKA-8170) To add kafka data at rest encryption

2019-03-28 Thread Akash (JIRA)
Akash created KAFKA-8170:


 Summary: To add kafka data at rest encryption
 Key: KAFKA-8170
 URL: https://issues.apache.org/jira/browse/KAFKA-8170
 Project: Kafka
  Issue Type: New Feature
  Components: log
Reporter: Akash


Kafka have mechanism for wire encryption of data.
But the kafka data at rest which exist in /- is 
still unencrypted.
This directories now have log files with actual messages embedded metadata, but 
unauthorised user can still recover messages from this files
Addiding encryption for this data would be valuable for preventing message 
protection from disk theft, unauthorised user access on servers.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7895) Ktable supress operator emitting more than one record for the same key per window

2019-03-28 Thread Andrew Klopper (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16803813#comment-16803813
 ] 

Andrew Klopper commented on KAFKA-7895:
---

Hi John Roesler

Yes, my consumer's isolation level is set to read_committed.

Regards
Andrew

> Ktable supress operator emitting more than one record for the same key per 
> window
> -
>
> Key: KAFKA-7895
> URL: https://issues.apache.org/jira/browse/KAFKA-7895
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.1.1
>Reporter: prasanthi
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.2.0, 2.1.2
>
>
> Hi, We are using kstreams to get the aggregated counts per vendor(key) within 
> a specified window.
> Here's how we configured the suppress operator to emit one final record per 
> key/window.
> {code:java}
> KTable, Long> windowedCount = groupedStream
>  .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L)))
>  .count(Materialized.with(Serdes.Integer(),Serdes.Long()))
>  .suppress(Suppressed.untilWindowCloses(unbounded()));
> {code}
> But we are getting more than one record for the same key/window as shown 
> below.
> {code:java}
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162
> [KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584
> [KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107
> [KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315
> [KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code}
> Could you please take a look?
> Thanks
>  
>  
> Added by John:
> Acceptance Criteria:
>  * add suppress to system tests, such that it's exercised with crash/shutdown 
> recovery, rebalance, etc.
>  ** [https://github.com/apache/kafka/pull/6278]
>  * make sure that there's some system test coverage with caching disabled.
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943
>  * test with tighter time bounds with windows of say 30 seconds and use 
> system time without adding any extra time for verification
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8165) Streams task causes Out Of Memory after connection issues and store restoration

2019-03-28 Thread Di Campo (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Campo updated KAFKA-8165:

Description: 
Having a Kafka Streams 2.1 application, when Kafka brokers are stable, the 
(largely stateful) application has been consuming ~160 messages per second at a 
sustained rate for several hours. 

However it started having connection issues to the brokers. 
{code:java}
Connection to node 3 (/172.31.36.118:9092) could not be established. Broker may 
not be available. (org.apache.kafka.clients.NetworkClient){code}
Also it began showing a lot of these errors: 
{code:java}
WARN [Consumer 
clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-2-consumer,
 groupId=stream-processor] 1 partitions have leader brokers without a matching 
listener, including [broker-2-health-check-0] 
(org.apache.kafka.clients.NetworkClient){code}
In fact, the _health-check_ topic is in the broker but not consumed by this 
topology or used in any way by the Streams application (it is just broker 
healthcheck). It does not complain about topics that are actually consumed by 
the topology. 

Some time after these errors (that appear at a rate of 24 appearances per 
second during ~5 minutes), then the following logs appear: 
{code:java}
[2019-03-27 15:14:47,709] WARN [Consumer 
clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-1-restore-consumer,
 groupId=] Connection to node -3 (/ip3:9092) could not be established. Broker 
may not be available. (org.apache.kafka.clients.NetworkClient){code}
In between 6 and then 3 lines of "Connection could not be established" error 
messages, 3 of these ones slipped in: 
{code:java}
[2019-03-27 15:14:47,723] WARN Started Restoration of visitorCustomerStore 
partition 15 total records to be restored 17 
(com.divvit.dp.streams.applications.monitors.ConsoleGlobalRestoreListener){code}
 
 ... one for each different KV store I have (I still have another KV that does 
not appear, and a WindowedStore store that also does not appear). 
 Then I finally see "Restoration Complete" (using a logging 
ConsoleGlobalRestoreListener as in docs) messages for all of my stores. So it 
seems it may be fine now to restart the processing.

Three minutes later, some events get processed, and I see an OOM error:  
{code:java}
java.lang.OutOfMemoryError: GC overhead limit exceeded{code}
 

... so given that it usually allows to process during hours under same 
circumstances, I'm wondering whether there is some memory leak in the 
connection resources or somewhere in the handling of this scenario.

Kafka and KafkaStreams 2.1

  was:
Having a Kafka Streams 2.1 application, when Kafka brokers are stable, the 
(largely stateful) application has been consuming ~160 messages per second at a 
sustained rate for several hours. 

However it started having connection issues to the brokers. 


{code:java}
Connection to node 3 (/172.31.36.118:9092) could not be established. Broker may 
not be available. (org.apache.kafka.clients.NetworkClient){code}

Also it began showing a lot of these errors: 


{code:java}
WARN [Consumer 
clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-2-consumer,
 groupId=stream-processor] 1 partitions have leader brokers without a matching 
listener, including [broker-2-health-check-0] 
(org.apache.kafka.clients.NetworkClient){code}
In fact, the _health-check_ topic is in the broker but not consumed by this 
topology or used in any way by the Streams application (it is just broker 
healthcheck). It does not complain about topics that are actually consumed by 
the topology. 

Some time after these errors (that appear at a rate of 24 appearances per 
second during ~5 minutes), then the following logs appear: 


{code:java}
[2019-03-27 15:14:47,709] WARN [Consumer 
clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-1-restore-consumer,
 groupId=] Connection to node -3 (/ip3:9092) could not be established. Broker 
may not be available. (org.apache.kafka.clients.NetworkClient){code}
In between 6 and then 3 lines of "Connection could not be established" error 
messages, 3 of these ones slipped in: 


[2019-03-27 15:14:47,723] WARN Started Restoration of visitorCustomerStore 
partition 15 total records to be restored 17 
(com.divvit.dp.streams.applications.monitors.ConsoleGlobalRestoreListener)
 
... one for each different KV store I have (I still have another KV that does 
not appear, and a WindowedStore store that also does not appear). 
Then I finally see "Restoration Complete" (using a logging 
ConsoleGlobalRestoreListener as in docs) messages for all of my stores. So it 
seems it may be fine now to restart the processing.

Three minutes later, some events get processed, and I see an OOM error:  


java.lang.OutOfMemoryError: GC overhead limit exceeded
 

... so given that it usually allows to process during hours under 

[jira] [Updated] (KAFKA-8165) Streams task causes Out Of Memory after connection issues and store restoration

2019-03-28 Thread Di Campo (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Di Campo updated KAFKA-8165:

Environment: 
3 nodes, 22 topics, 16 partitions per topic, 1 window store, 4 KV stores. 
Kafka Streams application cluster: 3 AWS t2.large instances (8GB mem). 1 
application instance, 2 threads per instance.
Kafka 2.1, Kafka Streams 2.1
Amazon Linux.
Scala application, on Docker based on openJdk9. 

  was:
Kafka 2.1, Kafka Streams 2.1
Amazon Linux, on Docker based on wurstmeister/kafka image


> Streams task causes Out Of Memory after connection issues and store 
> restoration
> ---
>
> Key: KAFKA-8165
> URL: https://issues.apache.org/jira/browse/KAFKA-8165
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
> Environment: 3 nodes, 22 topics, 16 partitions per topic, 1 window 
> store, 4 KV stores. 
> Kafka Streams application cluster: 3 AWS t2.large instances (8GB mem). 1 
> application instance, 2 threads per instance.
> Kafka 2.1, Kafka Streams 2.1
> Amazon Linux.
> Scala application, on Docker based on openJdk9. 
>Reporter: Di Campo
>Priority: Major
>
> Having a Kafka Streams 2.1 application, when Kafka brokers are stable, the 
> (largely stateful) application has been consuming ~160 messages per second at 
> a sustained rate for several hours. 
> However it started having connection issues to the brokers. 
> {code:java}
> Connection to node 3 (/172.31.36.118:9092) could not be established. Broker 
> may not be available. (org.apache.kafka.clients.NetworkClient){code}
> Also it began showing a lot of these errors: 
> {code:java}
> WARN [Consumer 
> clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-2-consumer,
>  groupId=stream-processor] 1 partitions have leader brokers without a 
> matching listener, including [broker-2-health-check-0] 
> (org.apache.kafka.clients.NetworkClient){code}
> In fact, the _health-check_ topic is in the broker but not consumed by this 
> topology or used in any way by the Streams application (it is just broker 
> healthcheck). It does not complain about topics that are actually consumed by 
> the topology. 
> Some time after these errors (that appear at a rate of 24 appearances per 
> second during ~5 minutes), then the following logs appear: 
> {code:java}
> [2019-03-27 15:14:47,709] WARN [Consumer 
> clientId=stream-processor-81e1ce17-1765-49f8-9b44-117f983a2d19-StreamThread-1-restore-consumer,
>  groupId=] Connection to node -3 (/ip3:9092) could not be established. Broker 
> may not be available. (org.apache.kafka.clients.NetworkClient){code}
> In between 6 and then 3 lines of "Connection could not be established" error 
> messages, 3 of these ones slipped in: 
> [2019-03-27 15:14:47,723] WARN Started Restoration of visitorCustomerStore 
> partition 15 total records to be restored 17 
> (com.divvit.dp.streams.applications.monitors.ConsoleGlobalRestoreListener)
>  
> ... one for each different KV store I have (I still have another KV that does 
> not appear, and a WindowedStore store that also does not appear). 
> Then I finally see "Restoration Complete" (using a logging 
> ConsoleGlobalRestoreListener as in docs) messages for all of my stores. So it 
> seems it may be fine now to restart the processing.
> Three minutes later, some events get processed, and I see an OOM error:  
> java.lang.OutOfMemoryError: GC overhead limit exceeded
>  
> ... so given that it usually allows to process during hours under same 
> circumstances, I'm wondering whether there is some memory leak in the 
> connection resources or somewhere in the handling of this scenario.
> Kafka and KafkaStreams 2.1



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8169) Wrong topic on streams quick start documentation

2019-03-28 Thread Tcheutchoua Steve (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16803656#comment-16803656
 ] 

Tcheutchoua Steve commented on KAFKA-8169:
--

This has been adjusted and patched in this 
[PR|[https://github.com/apache/kafka/pull/6513]]

> Wrong topic on streams quick start documentation
> 
>
> Key: KAFKA-8169
> URL: https://issues.apache.org/jira/browse/KAFKA-8169
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation
>Affects Versions: 2.1.1
>Reporter: Tcheutchoua Steve
>Priority: Minor
>  Labels: documentation
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> [Kafka Streams Quick 
> Start|[https://kafka.apache.org/21/documentation/streams/quickstart]]
> Though out the tutorial, the name of the input topic that was created is 
> `streams-plaintext-input`. However, this was mistaken at some point in the 
> tutorial and changed to `streams-wordcount-input`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8169) Wrong topic on streams quick start documentation

2019-03-28 Thread Tcheutchoua Steve (JIRA)
Tcheutchoua Steve created KAFKA-8169:


 Summary: Wrong topic on streams quick start documentation
 Key: KAFKA-8169
 URL: https://issues.apache.org/jira/browse/KAFKA-8169
 Project: Kafka
  Issue Type: Improvement
  Components: documentation
Affects Versions: 2.1.1
Reporter: Tcheutchoua Steve


[Kafka Streams Quick 
Start|[https://kafka.apache.org/21/documentation/streams/quickstart]]

Though out the tutorial, the name of the input topic that was created is 
`streams-plaintext-input`. However, this was mistaken at some point in the 
tutorial and changed to `streams-wordcount-input`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6042) Kafka Request Handler deadlocks and brings down the cluster.

2019-03-28 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16803630#comment-16803630
 ] 

Ismael Juma commented on KAFKA-6042:


Fixed.

> Kafka Request Handler deadlocks and brings down the cluster.
> 
>
> Key: KAFKA-6042
> URL: https://issues.apache.org/jira/browse/KAFKA-6042
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0, 0.11.0.1
> Environment: kafka version: 0.11.0.1
> client versions: 0.8.2.1-0.10.2.1
> platform: aws (eu-west-1a)
> nodes: 36 x r4.xlarge
> disk storage: 2.5 tb per node (~73% usage per node)
> topics: 250
> number of partitions: 48k (approx)
> os: ubuntu 14.04
> jvm: Java(TM) SE Runtime Environment (build 1.8.0_131-b11), Java HotSpot(TM) 
> 64-Bit Server VM (build 25.131-b11, mixed mode)
>Reporter: Ben Corlett
>Assignee: Rajini Sivaram
>Priority: Blocker
> Fix For: 0.11.0.2, 1.0.0
>
> Attachments: heapusage.png, thread_dump.txt.gz
>
>
> We have been experiencing a deadlock that happens on a consistent server 
> within our cluster. This happens multiple times a week currently. It first 
> started happening when we upgraded to 0.11.0.0. Sadly 0.11.0.1 failed to 
> resolve the issue.
> Sequence of events:
> At a seemingly random time broker 125 goes into a deadlock. As soon as it is 
> deadlocked it will remove all the ISR's for any partition is its the leader 
> for.
> [2017-10-10 00:06:10,061] INFO Partition [XX,24] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,073] INFO Partition [XX,974] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,079] INFO Partition [XX,64] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,081] INFO Partition [XX,21] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,084] INFO Partition [XX,12] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,085] INFO Partition [XX,61] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,086] INFO Partition [XX,53] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,088] INFO Partition [XX,27] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,090] INFO Partition [XX,182] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,091] INFO Partition [XX,16] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> 
> The other nodes fail to connect to the node 125 
> [2017-10-10 00:08:42,318] WARN [ReplicaFetcherThread-0-125]: Error in fetch 
> to broker 125, request (type=FetchRequest, replicaId=101, maxWait=500, 
> minBytes=1, maxBytes=10485760, fetchData={XX-94=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-22=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-58=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-11=(offset=78932482, 
> logStartOffset=50881481, maxBytes=1048576), XX-55=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-19=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-91=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-5=(offset=903857106, 
> logStartOffset=0, maxBytes=1048576), XX-80=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-88=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-34=(offset=308, 
> logStartOffset=308, maxBytes=1048576), XX-7=(offset=369990, 
> logStartOffset=369990, maxBytes=1048576), XX-0=(offset=57965795, 
> logStartOffset=0, maxBytes=1048576)}) (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 125 was disconnected before the response 
> was read
> at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:93)
> at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:93)
> at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:207)
> at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:151)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> As node 125 removed all the ISRs as it was locking up, a failover for any 
> partition 

[jira] [Updated] (KAFKA-6042) Kafka Request Handler deadlocks and brings down the cluster.

2019-03-28 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-6042:
---
Affects Version/s: (was: 1.0.0)

> Kafka Request Handler deadlocks and brings down the cluster.
> 
>
> Key: KAFKA-6042
> URL: https://issues.apache.org/jira/browse/KAFKA-6042
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0, 0.11.0.1
> Environment: kafka version: 0.11.0.1
> client versions: 0.8.2.1-0.10.2.1
> platform: aws (eu-west-1a)
> nodes: 36 x r4.xlarge
> disk storage: 2.5 tb per node (~73% usage per node)
> topics: 250
> number of partitions: 48k (approx)
> os: ubuntu 14.04
> jvm: Java(TM) SE Runtime Environment (build 1.8.0_131-b11), Java HotSpot(TM) 
> 64-Bit Server VM (build 25.131-b11, mixed mode)
>Reporter: Ben Corlett
>Assignee: Rajini Sivaram
>Priority: Blocker
> Fix For: 0.11.0.2, 1.0.0
>
> Attachments: heapusage.png, thread_dump.txt.gz
>
>
> We have been experiencing a deadlock that happens on a consistent server 
> within our cluster. This happens multiple times a week currently. It first 
> started happening when we upgraded to 0.11.0.0. Sadly 0.11.0.1 failed to 
> resolve the issue.
> Sequence of events:
> At a seemingly random time broker 125 goes into a deadlock. As soon as it is 
> deadlocked it will remove all the ISR's for any partition is its the leader 
> for.
> [2017-10-10 00:06:10,061] INFO Partition [XX,24] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,073] INFO Partition [XX,974] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,079] INFO Partition [XX,64] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,081] INFO Partition [XX,21] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,084] INFO Partition [XX,12] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,085] INFO Partition [XX,61] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,086] INFO Partition [XX,53] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,088] INFO Partition [XX,27] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,090] INFO Partition [XX,182] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> [2017-10-10 00:06:10,091] INFO Partition [XX,16] on broker 125: 
> Shrinking ISR from 117,125 to 125 (kafka.cluster.Partition)
> 
> The other nodes fail to connect to the node 125 
> [2017-10-10 00:08:42,318] WARN [ReplicaFetcherThread-0-125]: Error in fetch 
> to broker 125, request (type=FetchRequest, replicaId=101, maxWait=500, 
> minBytes=1, maxBytes=10485760, fetchData={XX-94=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-22=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-58=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-11=(offset=78932482, 
> logStartOffset=50881481, maxBytes=1048576), XX-55=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-19=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-91=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-5=(offset=903857106, 
> logStartOffset=0, maxBytes=1048576), XX-80=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-88=(offset=0, 
> logStartOffset=0, maxBytes=1048576), XX-34=(offset=308, 
> logStartOffset=308, maxBytes=1048576), XX-7=(offset=369990, 
> logStartOffset=369990, maxBytes=1048576), XX-0=(offset=57965795, 
> logStartOffset=0, maxBytes=1048576)}) (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 125 was disconnected before the response 
> was read
> at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:93)
> at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:93)
> at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:207)
> at 
> kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
> at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:151)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> As node 125 removed all the ISRs as it was locking up, a failover for any 
> partition without an