[jira] [Updated] (KAFKA-9840) Consumer should not use OffsetForLeaderEpoch without current epoch validation

2020-04-08 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-9840:
---
Description: 
We have observed a case where the consumer attempted to detect truncation with 
the OffsetsForLeaderEpoch API against a broker which had become a zombie. In 
this case, the last epoch known to the consumer was higher than the last epoch 
known to the zombie broker, so the broker returned -1 as both the end offset 
and epoch in the response. The consumer did not check for this in the response, 
which resulted in the following message:

{code}
Truncation detected for partition topic-1 at offset FetchPosition{offset=11859, 
offsetEpoch=Optional[46], currentLeader=LeaderAndEpoch{leader=broker-host (id: 
3 rack: null), epoch=-1}}, resetting offset to the first offset known to 
diverge FetchPosition{offset=-1, offsetEpoch=Optional[-1], 
currentLeader=LeaderAndEpoch{broker-host (id: 3 rack: null), epoch=-1}} 
(org.apache.kafka.clients.consumer.internals.SubscriptionState:414)
{code}

There are a couple ways we the consumer can handle this situation better. 
First, the reason we did not detect the zombie broker is that we did not 
include the current leader epoch in the OffsetForLeaderEpoch request. This was 
likely because of KAFKA-9212. Following this patch, we would not initialize the 
current leader epoch from metadata responses because there are cases that we 
cannot rely on it. But if the client cannot rely on being able to detect 
zombies, then the epoch validation is less useful anyway. So the simple 
solution is to not bother with the validation unless we have a reliable current 
leader epoch.

Second, the consumer needs to check for the case when the returned offset and 
epoch are not defined. In this case, we have to treat this as a normal 
OffsetOutOfRange case and invoke the reset policy. 



  was:
We have observed a case where the consumer attempted to detect truncation with 
the OffsetsForLeaderEpoch API against a broker which had become a zombie. In 
this case, the last epoch known to the consumer was higher than the last epoch 
known to the zombie broker, so the broker returned -1 as the offset and epoch 
in the response. The consumer did not check for this in the response, which 
resulted in the following message:

{code}
Truncation detected for partition topic-1 at offset FetchPosition{offset=11859, 
offsetEpoch=Optional[46], currentLeader=LeaderAndEpoch{leader=broker-host (id: 
3 rack: null), epoch=-1}}, resetting offset to the first offset known to 
diverge FetchPosition{offset=-1, offsetEpoch=Optional[-1], 
currentLeader=LeaderAndEpoch{broker-host (id: 3 rack: null), epoch=-1}} 
(org.apache.kafka.clients.consumer.internals.SubscriptionState:414)
{code}

There are a couple ways we the consumer can handle this situation better. 
First, the reason we did not detect the zombie broker is that we did not 
include the current leader epoch in the OffsetForLeaderEpoch request. This was 
likely because of KAFKA-9212. Following this patch, we would not initialize the 
current leader epoch from metadata responses because there are cases that we 
cannot rely on it. But if the client cannot rely on being able to detect 
zombies, then the epoch validation is less useful anyway. So the simple 
solution is to not bother with the validation unless we have a reliable current 
leader epoch.

Second, the consumer needs to check for the case when the returned offset and 
epoch are not defined. In this case, we have to treat this as a normal 
OffsetOutOfRange case and invoke the reset policy. 




> Consumer should not use OffsetForLeaderEpoch without current epoch validation
> -
>
> Key: KAFKA-9840
> URL: https://issues.apache.org/jira/browse/KAFKA-9840
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.4.1
>Reporter: Jason Gustafson
>Priority: Major
>
> We have observed a case where the consumer attempted to detect truncation 
> with the OffsetsForLeaderEpoch API against a broker which had become a 
> zombie. In this case, the last epoch known to the consumer was higher than 
> the last epoch known to the zombie broker, so the broker returned -1 as both 
> the end offset and epoch in the response. The consumer did not check for this 
> in the response, which resulted in the following message:
> {code}
> Truncation detected for partition topic-1 at offset 
> FetchPosition{offset=11859, offsetEpoch=Optional[46], 
> currentLeader=LeaderAndEpoch{leader=broker-host (id: 3 rack: null), 
> epoch=-1}}, resetting offset to the first offset known to diverge 
> FetchPosition{offset=-1, offsetEpoch=Optional[-1], 
> currentLeader=LeaderAndEpoch{broker-host (id: 3 rack: null), epoch=-1}} 
> 

[jira] [Updated] (KAFKA-9840) Consumer should not use OffsetForLeaderEpoch without current epoch validation

2020-04-08 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-9840:
---
Affects Version/s: 2.4.1

> Consumer should not use OffsetForLeaderEpoch without current epoch validation
> -
>
> Key: KAFKA-9840
> URL: https://issues.apache.org/jira/browse/KAFKA-9840
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.4.1
>Reporter: Jason Gustafson
>Priority: Major
>
> We have observed a case where the consumer attempted to detect truncation 
> with the OffsetsForLeaderEpoch API against a broker which had become a 
> zombie. In this case, the last epoch known to the consumer was higher than 
> the last epoch known to the zombie broker, so the broker returned -1 as the 
> offset and epoch in the response. The consumer did not check for this in the 
> response, which resulted in the following message:
> {code}
> Truncation detected for partition topic-1 at offset 
> FetchPosition{offset=11859, offsetEpoch=Optional[46], 
> currentLeader=LeaderAndEpoch{leader=broker-host (id: 3 rack: null), 
> epoch=-1}}, resetting offset to the first offset known to diverge 
> FetchPosition{offset=-1, offsetEpoch=Optional[-1], 
> currentLeader=LeaderAndEpoch{broker-host (id: 3 rack: null), epoch=-1}} 
> (org.apache.kafka.clients.consumer.internals.SubscriptionState:414)
> {code}
> There are a couple ways we the consumer can handle this situation better. 
> First, the reason we did not detect the zombie broker is that we did not 
> include the current leader epoch in the OffsetForLeaderEpoch request. This 
> was likely because of KAFKA-9212. Following this patch, we would not 
> initialize the current leader epoch from metadata responses because there are 
> cases that we cannot rely on it. But if the client cannot rely on being able 
> to detect zombies, then the epoch validation is less useful anyway. So the 
> simple solution is to not bother with the validation unless we have a 
> reliable current leader epoch.
> Second, the consumer needs to check for the case when the returned offset and 
> epoch are not defined. In this case, we have to treat this as a normal 
> OffsetOutOfRange case and invoke the reset policy. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9840) Consumer should not use OffsetForLeaderEpoch without current epoch validation

2020-04-08 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-9840:
--

 Summary: Consumer should not use OffsetForLeaderEpoch without 
current epoch validation
 Key: KAFKA-9840
 URL: https://issues.apache.org/jira/browse/KAFKA-9840
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: Jason Gustafson


We have observed a case where the consumer attempted to detect truncation with 
the OffsetsForLeaderEpoch API against a broker which had become a zombie. In 
this case, the last epoch known to the consumer was higher than the last epoch 
known to the zombie broker, so the broker returned -1 as the offset and epoch 
in the response. The consumer did not check for this in the response, which 
resulted in the following message:

{code}
Truncation detected for partition topic-1 at offset FetchPosition{offset=11859, 
offsetEpoch=Optional[46], currentLeader=LeaderAndEpoch{leader=broker-host (id: 
3 rack: null), epoch=-1}}, resetting offset to the first offset known to 
diverge FetchPosition{offset=-1, offsetEpoch=Optional[-1], 
currentLeader=LeaderAndEpoch{broker-host (id: 3 rack: null), epoch=-1}} 
(org.apache.kafka.clients.consumer.internals.SubscriptionState:414)
{code}

There are a couple ways we the consumer can handle this situation better. 
First, the reason we did not detect the zombie broker is that we did not 
include the current leader epoch in the OffsetForLeaderEpoch request. This was 
likely because of KAFKA-9212. Following this patch, we would not initialize the 
current leader epoch from metadata responses because there are cases that we 
cannot rely on it. But if the client cannot rely on being able to detect 
zombies, then the epoch validation is less useful anyway. So the simple 
solution is to not bother with the validation unless we have a reliable current 
leader epoch.

Second, the consumer needs to check for the case when the returned offset and 
epoch are not defined. In this case, we have to treat this as a normal 
OffsetOutOfRange case and invoke the reset policy. 





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9839) IllegalStateException on metadata update when broker learns about its new epoch after the controller

2020-04-08 Thread Anna Povzner (Jira)
Anna Povzner created KAFKA-9839:
---

 Summary: IllegalStateException on metadata update when broker 
learns about its new epoch after the controller
 Key: KAFKA-9839
 URL: https://issues.apache.org/jira/browse/KAFKA-9839
 Project: Kafka
  Issue Type: Bug
  Components: controller, core
Affects Versions: 2.3.1
Reporter: Anna Povzner


Broker throws "java.lang.IllegalStateException: Epoch XXX larger than current 
broker epoch YYY"  on UPDATE_METADATA when the controller learns about the 
broker epoch and sends UPDATE_METADATA before KafkaZkCLient.registerBroker 
completes (the broker learns about its new epoch).

Here is the scenario we observed in more detail:
1. ZK session expires on broker 1
2. Broker 1 establishes new session to ZK and creates znode
3. Controller learns about broker 1 and assigns epoch
4. Broker 1 receives UPDATE_METADATA from controller, but it does not know 
about its new epoch yet, so we get an exception:

ERROR [KafkaApi-3] Error when handling request: clientId=1, correlationId=0, 
api=UPDATE_METADATA, body={
.
java.lang.IllegalStateException: Epoch XXX larger than current broker epoch YYY 
at kafka.server.KafkaApis.isBrokerEpochStale(KafkaApis.scala:2725) at 
kafka.server.KafkaApis.handleUpdateMetadataRequest(KafkaApis.scala:320) at 
kafka.server.KafkaApis.handle(KafkaApis.scala:139) at 
kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69) at 
java.lang.Thread.run(Thread.java:748)

5. KafkaZkCLient.registerBroker completes on broker 1: "INFO Stat of the 
created znode at /brokers/ids/1"

The result is the broker has a stale metadata for some time.

Possible solutions:
1. Broker returns a more specific error and controller retries UPDATE_MEDATA
2. Broker accepts UPDATE_METADATA with larger broker epoch.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9831) Failing test: EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState[exactly_once_beta]

2020-04-08 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078896#comment-17078896
 ] 

Guozhang Wang commented on KAFKA-9831:
--

Thanks for the report Matthias.

> Failing test: 
> EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState[exactly_once_beta]
> --
>
> Key: KAFKA-9831
> URL: https://issues.apache.org/jira/browse/KAFKA-9831
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: John Roesler
>Priority: Major
> Attachments: one.stdout.txt, two.stdout.txt
>
>
> I've seen this fail twice in a row on the same build, but with different 
> errors. Stacktraces follow; stdout is attached.
> One:
> {noformat}
> java.lang.AssertionError: Did not receive all 40 records from topic 
> singlePartitionOutputTopic within 6 ms
> Expected: is a value equal to or greater than <40>
>  but: <39> was less than <40>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:517)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:513)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:491)
>   at 
> org.apache.kafka.streams.integration.EosIntegrationTest.readResult(EosIntegrationTest.java:766)
>   at 
> org.apache.kafka.streams.integration.EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState(EosIntegrationTest.java:473)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at org.junit.runners.Suite.runChild(Suite.java:128)
>   at org.junit.runners.Suite.runChild(Suite.java:27)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> 

[jira] [Commented] (KAFKA-9831) Failing test: EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState[exactly_once_beta]

2020-04-08 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078855#comment-17078855
 ] 

Matthias J. Sax commented on KAFKA-9831:


I think I found the root cause of this flakieness: 
[https://github.com/apache/kafka/pull/8443/files#r405910209]

Should be fixed this that PR.

> Failing test: 
> EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState[exactly_once_beta]
> --
>
> Key: KAFKA-9831
> URL: https://issues.apache.org/jira/browse/KAFKA-9831
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: John Roesler
>Priority: Major
> Attachments: one.stdout.txt, two.stdout.txt
>
>
> I've seen this fail twice in a row on the same build, but with different 
> errors. Stacktraces follow; stdout is attached.
> One:
> {noformat}
> java.lang.AssertionError: Did not receive all 40 records from topic 
> singlePartitionOutputTopic within 6 ms
> Expected: is a value equal to or greater than <40>
>  but: <39> was less than <40>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:517)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:513)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:491)
>   at 
> org.apache.kafka.streams.integration.EosIntegrationTest.readResult(EosIntegrationTest.java:766)
>   at 
> org.apache.kafka.streams.integration.EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState(EosIntegrationTest.java:473)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at org.junit.runners.Suite.runChild(Suite.java:128)
>   at org.junit.runners.Suite.runChild(Suite.java:27)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> 

[jira] [Commented] (KAFKA-9592) Safely abort Producer transactions during application shutdown

2020-04-08 Thread Xiang Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078843#comment-17078843
 ] 

Xiang Zhang commented on KAFKA-9592:


[~bchen225242] Thanks for the reply. Do you mean we cannot know for sure when 
`abortTransaction` would throw another exception ? Even we rule out some 
unrecoverable exceptions (ProducerFencedException, OutOfOrderException and 
AuthorizationException) first? If yes, are we expecting that users should not 
try to call `abortTransaction` at all and always close the producer client in 
case of KafkaException, unless the exception is thrown because of users' own 
logic, possibly like this
{code:java}
try {
producer.beginTransaction();
// processing logic
producer.commitTransaction();
} catch (KafkaException e) {
producer.close();// in close(), we try to abort the transaction and 
handle all possible exceptions internally
} catch (Exception e) {  // other exceptions due to users' processing logic
producer.abortTransaction();
}
producer.close();{code}

> Safely abort Producer transactions during application shutdown
> --
>
> Key: KAFKA-9592
> URL: https://issues.apache.org/jira/browse/KAFKA-9592
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 2.5.0
>Reporter: Boyang Chen
>Assignee: Xiang Zhang
>Priority: Major
>  Labels: help-wanted, needs-kip, newbie
> Fix For: 2.6.0
>
>
> Today if a transactional producer hits a fatal exception, the caller usually 
> catches the exception and handle it by closing the producer, and abort the 
> transaction:
>  
> {code:java}
> try {
>   producer.beginTxn();
>   producer.send(xxx);
>   producer.sendOffsets(xxx);
>   producer.commit();
> } catch (ProducerFenced | UnknownPid e) {
>   ...
>   producer.abortTxn();
>   producer.close();
> }{code}
> This is what the current API suggests user to do. Another scenario is during 
> an informed shutdown, people with EOS producer would also like to end an 
> ongoing transaction before closing the producer as it sounds more clean.
> The tricky scenario is that `abortTxn` is not a safe call when the producer 
> is already in an error state, which means user has to do another try-catch 
> with the first layer catch block, making the error handling pretty annoying. 
> There are several ways to make this API robust and guide user to a safe usage:
>  # Internally abort any ongoing transaction within `producer.close`, and 
> comment on `abortTxn` call to warn user not to do it manually. 
>  # Similar to 1, but get a new `close(boolean abortTxn)` API call in case 
> some users want to handle transaction state by themselves.
>  # Introduce a new abort transaction API with a boolean flag indicating 
> whether the producer is in error state, instead of throwing exceptions
>  # Introduce a public API `isInError` on producer for user to validate before 
> doing any transactional API calls
> I personally favor 1 & 2 most as it is simple and does not require any API 
> change. Considering the change scope, I would still recommend a small KIP.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6145) Warm up new KS instances before migrating tasks - potentially a two phase rebalance

2020-04-08 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078841#comment-17078841
 ] 

ASF GitHub Bot commented on KAFKA-6145:
---

vvcephei commented on pull request #8425: KAFKA-6145: KIP-441 Move tasks with 
caught-up destination clients right away
URL: https://github.com/apache/kafka/pull/8425
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Warm up new KS instances before migrating tasks - potentially a two phase 
> rebalance
> ---
>
> Key: KAFKA-6145
> URL: https://issues.apache.org/jira/browse/KAFKA-6145
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Antony Stubbs
>Assignee: Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
>
> Currently when expanding the KS cluster, the new node's partitions will be 
> unavailable during the rebalance, which for large states can take a very long 
> time, or for small state stores even more than a few ms can be a deal breaker 
> for micro service use cases.
> One workaround would be two execute the rebalance in two phases:
> 1) start running state store building on the new node
> 2) once the state store is fully populated on the new node, only then 
> rebalance the tasks - there will still be a rebalance pause, but would be 
> greatly reduced
> Relates to: KAFKA-6144 - Allow state stores to serve stale reads during 
> rebalance



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7965) Flaky Test ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup

2020-04-08 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078837#comment-17078837
 ] 

Matthias J. Sax commented on KAFKA-7965:


[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/5710/testReport/junit/kafka.api/ConsumerBounceTest/testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup/]

> Flaky Test 
> ConsumerBounceTest#testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
> 
>
> Key: KAFKA-7965
> URL: https://issues.apache.org/jira/browse/KAFKA-7965
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer, unit tests
>Affects Versions: 1.1.1, 2.2.0, 2.3.0
>Reporter: Matthias J. Sax
>Assignee: David Jacot
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.3.0
>
>
> To get stable nightly builds for `2.2` release, I create tickets for all 
> observed test failures.
> [https://jenkins.confluent.io/job/apache-kafka-test/job/2.2/21/]
> {quote}java.lang.AssertionError: Received 0, expected at least 68 at 
> org.junit.Assert.fail(Assert.java:88) at 
> org.junit.Assert.assertTrue(Assert.java:41) at 
> kafka.api.ConsumerBounceTest.receiveAndCommit(ConsumerBounceTest.scala:557) 
> at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1(ConsumerBounceTest.scala:320)
>  at 
> kafka.api.ConsumerBounceTest.$anonfun$testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup$1$adapted(ConsumerBounceTest.scala:319)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
> kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup(ConsumerBounceTest.scala:319){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9543) Consumer offset reset after new segment rolling

2020-04-08 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078829#comment-17078829
 ] 

Jason Gustafson commented on KAFKA-9543:


I think this is the same issue as KAFKA-9824. I have been trying to reproduce 
it in a test case, but no luck so far. I found a case which could result in 
unexpected out of range errors in KAFKA-9835, but I'm not sure that's what 
we're looking for here given the coincidence of segment rolling which we have 
now seen in several independent reports. I guess it's at least theoretically 
possible that we get a sequence like this:

1. Broker accepts append and rolls segment
2. Data is written to new segment
3. Consumer fetches from previous log end and hits KAFKA-9835 which results in 
receiving uncommitted data.
4. Consumer fetches again from the new log end offset which results in the out 
of range error
5. Broker updates new log end offset.

This would require both KAFKA-9835 to be hit (or some similar error) combined 
with an edge case like the one that [~brianj] mentioned above. I'm having a 
hard time accepting this though. In my testing I added an explicit sleep 
between the segment append and the update of the log end offset and I still 
couldn't manage to reproduce a sequence like the one above. It's possible I'm 
missing some detail though.

If anyone has a way to reproduce this issue reliably, it would help to have a 
dump from the segments spanning the log roll. The main thing I want to 
understand is whether the "out of range" data is on the new segment or the old 
one.


> Consumer offset reset after new segment rolling
> ---
>
> Key: KAFKA-9543
> URL: https://issues.apache.org/jira/browse/KAFKA-9543
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Rafał Boniecki
>Priority: Major
> Attachments: Untitled.png, image-2020-04-06-17-10-32-636.png
>
>
> After upgrade from kafka 2.1.1 to 2.4.0, I'm experiencing unexpected consumer 
> offset resets.
> Consumer:
> {code:java}
> 2020-02-12T11:12:58.402+01:00 hostname 4a2a39a35a02 
> [2020-02-12T11:12:58,402][INFO 
> ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer 
> clientId=logstash-1, groupId=logstash] Fetch offset 1632750575 is out of 
> range for partition stats-5, resetting offset
> {code}
> Broker:
> {code:java}
> 2020-02-12 11:12:58:400 CET INFO  
> [data-plane-kafka-request-handler-1][kafka.log.Log] [Log partition=stats-5, 
> dir=/kafka4/data] Rolled new log segment at offset 1632750565 in 2 ms.{code}
> All resets are perfectly correlated to rolling new segments at the broker - 
> segment is rolled first, then, couple of ms later, reset on the consumer 
> occurs. Attached is grafana graph with consumer lag per partition. All sudden 
> spikes in lag are offset resets due to this bug.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9831) Failing test: EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState[exactly_once_beta]

2020-04-08 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078817#comment-17078817
 ] 

Sophie Blee-Goldman commented on KAFKA-9831:


Failed twice on the same build:
h3. Error Message

java.lang.AssertionError: Did not receive all 40 records from topic 
singlePartitionOutputTopic within 6 ms Expected: is a value equal to or 
greater than <40> but: <39> was less than <40>
h3. Error Message

java.lang.AssertionError: Expected: <[KeyValue(1, 0), KeyValue(1, 1), 
KeyValue(1, 3), KeyValue(1, 6), KeyValue(1, 10), KeyValue(1, 15), KeyValue(1, 
21), KeyValue(1, 28), KeyValue(1, 36), KeyValue(1, 45)]> but: was <[KeyValue(1, 
0), KeyValue(1, 1), KeyValue(1, 3), KeyValue(1, 6), KeyValue(1, 10), 
KeyValue(1, 15), KeyValue(1, 21), KeyValue(1, 28), KeyValue(1, 36), KeyValue(1, 
45), KeyValue(1, 55), KeyValue(1, 66), KeyValue(1, 78)]>

> Failing test: 
> EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState[exactly_once_beta]
> --
>
> Key: KAFKA-9831
> URL: https://issues.apache.org/jira/browse/KAFKA-9831
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: John Roesler
>Priority: Major
> Attachments: one.stdout.txt, two.stdout.txt
>
>
> I've seen this fail twice in a row on the same build, but with different 
> errors. Stacktraces follow; stdout is attached.
> One:
> {noformat}
> java.lang.AssertionError: Did not receive all 40 records from topic 
> singlePartitionOutputTopic within 6 ms
> Expected: is a value equal to or greater than <40>
>  but: <39> was less than <40>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:517)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:513)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:491)
>   at 
> org.apache.kafka.streams.integration.EosIntegrationTest.readResult(EosIntegrationTest.java:766)
>   at 
> org.apache.kafka.streams.integration.EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState(EosIntegrationTest.java:473)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at org.junit.runners.Suite.runChild(Suite.java:128)
>   at org.junit.runners.Suite.runChild(Suite.java:27)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at 

[jira] [Commented] (KAFKA-9013) Flaky Test MirrorConnectorsIntegrationTest#testReplication

2020-04-08 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9013?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078720#comment-17078720
 ] 

Matthias J. Sax commented on KAFKA-9013:


[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/5691/testReport/junit/org.apache.kafka.connect.mirror/MirrorConnectorsIntegrationTest/testReplication/]

> Flaky Test MirrorConnectorsIntegrationTest#testReplication
> --
>
> Key: KAFKA-9013
> URL: https://issues.apache.org/jira/browse/KAFKA-9013
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Bruno Cadonna
>Priority: Major
>  Labels: flaky-test
>
> h1. Stacktrace:
> {code:java}
> java.lang.AssertionError: Condition not met within timeout 2. Offsets not 
> translated downstream to primary cluster.
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:377)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:354)
>   at 
> org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication(MirrorConnectorsIntegrationTest.java:239)
> {code}
> h1. Standard Error
> {code}
> Standard Error
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> registered in SERVER runtime does not implement any provider interfaces 
> applicable in the SERVER runtime. Due to constraint configuration problems 
> the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will 
> be ignored. 
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in 
> SERVER runtime does not implement any provider interfaces applicable in the 
> SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be 
> ignored. 
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered 
> in SERVER runtime does not implement any provider interfaces applicable in 
> the SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be 
> ignored. 
> Oct 09, 2019 11:32:00 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.RootResource registered in 
> SERVER runtime does not implement any provider interfaces applicable in the 
> SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored. 
> Oct 09, 2019 11:32:01 PM org.glassfish.jersey.internal.Errors logErrors
> WARNING: The following warnings have been detected: WARNING: The 
> (sub)resource method listLoggers in 
> org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains 
> empty path annotation.
> WARNING: The (sub)resource method listConnectors in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
> empty path annotation.
> WARNING: The (sub)resource method createConnector in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains 
> empty path annotation.
> WARNING: The (sub)resource method listConnectorPlugins in 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> contains empty path annotation.
> WARNING: The (sub)resource method serverInfo in 
> org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty 
> path annotation.
> Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource 
> registered in SERVER runtime does not implement any provider interfaces 
> applicable in the SERVER runtime. Due to constraint configuration problems 
> the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will 
> be ignored. 
> Oct 09, 2019 11:32:02 PM org.glassfish.jersey.internal.inject.Providers 
> checkProviderRuntime
> WARNING: A provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered 
> in SERVER runtime does not implement any provider interfaces applicable in 
> the SERVER runtime. Due to constraint configuration problems the provider 
> org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be 
> ignored. 
> Oct 09, 2019 11:32:02 PM 

[jira] [Commented] (KAFKA-9664) Flaky Test KafkaStreamsTest#testStateThreadClose

2020-04-08 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078660#comment-17078660
 ] 

Sophie Blee-Goldman commented on KAFKA-9664:


Huh...I can't seem to track down where I saw this, and the timeout is fishy. 
Maybe I was running tests on a really old version of the 2.5 branch? 

I guess we should just close this for now

> Flaky Test KafkaStreamsTest#testStateThreadClose
> 
>
> Key: KAFKA-9664
> URL: https://issues.apache.org/jira/browse/KAFKA-9664
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test
>
> org.apache.kafka.streams.KafkaStreamsTest > testStateThreadClose 
> FAILED*14:23:21*  java.lang.AssertionError: Condition not met within 
> timeout 1. Thread never stopped.*14:23:21*  at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:278)*14:23:21*
>   at 
> org.apache.kafka.streams.KafkaStreamsTest.testStateThreadClose(KafkaStreamsTest.java:204)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-976) Order-Preserving Mirror Maker Testcase

2020-04-08 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sönke Liebau resolved KAFKA-976.

Resolution: Won't Fix

As this has been dormant for a long time and no one reacted to my comment I'll 
close this for now.

> Order-Preserving Mirror Maker Testcase
> --
>
> Key: KAFKA-976
> URL: https://issues.apache.org/jira/browse/KAFKA-976
> Project: Kafka
>  Issue Type: Test
>Reporter: Guozhang Wang
>Assignee: John Fung
>Priority: Major
> Attachments: kafka-976-v1.patch
>
>
> A new testcase (5007) for mirror_maker_testsuite is needed for the 
> key-dependent order-preserving mirror maker, this is related to KAFKA-957.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-2493) Add ability to fetch only keys in consumer

2020-04-08 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-2493?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sönke Liebau resolved KAFKA-2493.
-
Resolution: Won't Fix

As this has been dormant for a long time and no one reacted to my comment I'll 
close this for now.

> Add ability to fetch only keys in consumer
> --
>
> Key: KAFKA-2493
> URL: https://issues.apache.org/jira/browse/KAFKA-2493
> Project: Kafka
>  Issue Type: Wish
>  Components: consumer
>Reporter: Ivan Balashov
>Assignee: Neha Narkhede
>Priority: Minor
>
> Often clients need to find out which offsets contain necessary data. One of 
> the possible solutions would be to iterate with small fetch size. However, 
> this still leads to unnecessary data being transmitted in case keys already 
> reference searched data. The ability to fetch keys only would simplify search 
> for the necessary offset.
> Of course, there can be other scenarios where consumer needs keys only, 
> without message part.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-2333) Add rename topic support

2020-04-08 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-2333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sönke Liebau resolved KAFKA-2333.
-
Resolution: Won't Fix

As this has been dormant for a long time and no one reacted to my comment I'll 
close this for now.

> Add rename topic support
> 
>
> Key: KAFKA-2333
> URL: https://issues.apache.org/jira/browse/KAFKA-2333
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Grant Henke
>Assignee: Grant Henke
>Priority: Major
>
> Add the ability to change the name of existing topics. 
> This likely needs an associated KIP. This Jira will be updated when one is 
> created.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-3925) Default log.dir=/tmp/kafka-logs is unsafe

2020-04-08 Thread Jira


 [ 
https://issues.apache.org/jira/browse/KAFKA-3925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sönke Liebau resolved KAFKA-3925.
-
Resolution: Won't Fix

As this has been dormant for a long time and no one reacted to my comment I'll 
close this for now.

> Default log.dir=/tmp/kafka-logs is unsafe
> -
>
> Key: KAFKA-3925
> URL: https://issues.apache.org/jira/browse/KAFKA-3925
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 0.10.0.0
> Environment: Various, depends on OS and configuration
>Reporter: Peter Davis
>Priority: Major
>
> Many operating systems are configured to delete files under /tmp.  For 
> example Ubuntu has 
> [tmpreaper|http://manpages.ubuntu.com/manpages/wily/man8/tmpreaper.8.html], 
> others use tmpfs, others delete /tmp on startup. 
> Defaults are OK to make getting started easier but should not be unsafe 
> (risking data loss). 
> Something under /var would be a better default log.dir under *nix.  Or 
> relative to the Kafka bin directory to avoid needing root.  
> If the default cannot be changed, I would suggest a special warning print to 
> the console on broker startup if log.dir is under /tmp. 
> See [users list 
> thread|http://mail-archives.apache.org/mod_mbox/kafka-users/201607.mbox/%3cCAD5tkZb-0MMuWJqHNUJ3i1+xuNPZ4tnQt-RPm65grxE0=0o...@mail.gmail.com%3e].
>   I've also been bitten by this. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9835) Race condition with concurrent write allows reads above high watermark

2020-04-08 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-9835:
---
Affects Version/s: (was: 2.4.1)
   (was: 2.3.1)
   (was: 2.2.2)
   2.2.0
   2.3.0
   2.4.0

> Race condition with concurrent write allows reads above high watermark
> --
>
> Key: KAFKA-9835
> URL: https://issues.apache.org/jira/browse/KAFKA-9835
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.0, 2.3.0, 2.4.0
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.4.2, 2.5.1
>
>
> Kafka's log implementation serializes all writes using a lock, but allows 
> multiple concurrent reads while that lock is held. The `FileRecords` class 
> contains the core implementation. Reads to the log create logical slices of 
> `FileRecords` which are then passed to the network layer for sending. An 
> abridged version of the implementation of `slice` is provided below:
> {code}
> public FileRecords slice(int position, int size) throws IOException {
> int end = this.start + position + size;
> // handle integer overflow or if end is beyond the end of the file
> if (end < 0 || end >= start + sizeInBytes())
> end = start + sizeInBytes();
> return new FileRecords(file, channel, this.start + position, end, 
> true);
> }
> {code}
> The `size` parameter here is typically derived from the fetch size, but is 
> upper-bounded with respect to the high watermark. The two calls to 
> `sizeInBytes` here are problematic because the size of the file may change in 
> between them. Specifically a concurrent write may increase the size of the 
> file after the first call to `sizeInBytes` but before the second one. In the 
> worst case, when `size` defines the limit of the high watermark, this can 
> lead to a slice containing uncommitted data.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9838) Add additional log concurrency test cases

2020-04-08 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-9838:
--

 Summary: Add additional log concurrency test cases
 Key: KAFKA-9838
 URL: https://issues.apache.org/jira/browse/KAFKA-9838
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson
Assignee: Jason Gustafson


A couple recent bug fixes affecting log read semantics were due to race 
conditions with concurrent operations: see KAFKA-9807 and KAFKA-9835. We need 
better testing of concurrent operations on the log to know if there are 
additional problems and to prevent future regressions.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9835) Race condition with concurrent write allows reads above high watermark

2020-04-08 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-9835:
---
Fix Version/s: 2.5.1
   2.4.2

> Race condition with concurrent write allows reads above high watermark
> --
>
> Key: KAFKA-9835
> URL: https://issues.apache.org/jira/browse/KAFKA-9835
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.2, 2.3.1, 2.4.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.4.2, 2.5.1
>
>
> Kafka's log implementation serializes all writes using a lock, but allows 
> multiple concurrent reads while that lock is held. The `FileRecords` class 
> contains the core implementation. Reads to the log create logical slices of 
> `FileRecords` which are then passed to the network layer for sending. An 
> abridged version of the implementation of `slice` is provided below:
> {code}
> public FileRecords slice(int position, int size) throws IOException {
> int end = this.start + position + size;
> // handle integer overflow or if end is beyond the end of the file
> if (end < 0 || end >= start + sizeInBytes())
> end = start + sizeInBytes();
> return new FileRecords(file, channel, this.start + position, end, 
> true);
> }
> {code}
> The `size` parameter here is typically derived from the fetch size, but is 
> upper-bounded with respect to the high watermark. The two calls to 
> `sizeInBytes` here are problematic because the size of the file may change in 
> between them. Specifically a concurrent write may increase the size of the 
> file after the first call to `sizeInBytes` but before the second one. In the 
> worst case, when `size` defines the limit of the high watermark, this can 
> lead to a slice containing uncommitted data.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9835) Race condition with concurrent write allows reads above high watermark

2020-04-08 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-9835:
---
Affects Version/s: 2.2.2
   2.3.1
   2.4.1

> Race condition with concurrent write allows reads above high watermark
> --
>
> Key: KAFKA-9835
> URL: https://issues.apache.org/jira/browse/KAFKA-9835
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.2, 2.3.1, 2.4.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> Kafka's log implementation serializes all writes using a lock, but allows 
> multiple concurrent reads while that lock is held. The `FileRecords` class 
> contains the core implementation. Reads to the log create logical slices of 
> `FileRecords` which are then passed to the network layer for sending. An 
> abridged version of the implementation of `slice` is provided below:
> {code}
> public FileRecords slice(int position, int size) throws IOException {
> int end = this.start + position + size;
> // handle integer overflow or if end is beyond the end of the file
> if (end < 0 || end >= start + sizeInBytes())
> end = start + sizeInBytes();
> return new FileRecords(file, channel, this.start + position, end, 
> true);
> }
> {code}
> The `size` parameter here is typically derived from the fetch size, but is 
> upper-bounded with respect to the high watermark. The two calls to 
> `sizeInBytes` here are problematic because the size of the file may change in 
> between them. Specifically a concurrent write may increase the size of the 
> file after the first call to `sizeInBytes` but before the second one. In the 
> worst case, when `size` defines the limit of the high watermark, this can 
> lead to a slice containing uncommitted data.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9835) Race condition with concurrent write allows reads above high watermark

2020-04-08 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-9835.

Resolution: Fixed

> Race condition with concurrent write allows reads above high watermark
> --
>
> Key: KAFKA-9835
> URL: https://issues.apache.org/jira/browse/KAFKA-9835
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.2, 2.3.1, 2.4.1
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
> Fix For: 2.4.2, 2.5.1
>
>
> Kafka's log implementation serializes all writes using a lock, but allows 
> multiple concurrent reads while that lock is held. The `FileRecords` class 
> contains the core implementation. Reads to the log create logical slices of 
> `FileRecords` which are then passed to the network layer for sending. An 
> abridged version of the implementation of `slice` is provided below:
> {code}
> public FileRecords slice(int position, int size) throws IOException {
> int end = this.start + position + size;
> // handle integer overflow or if end is beyond the end of the file
> if (end < 0 || end >= start + sizeInBytes())
> end = start + sizeInBytes();
> return new FileRecords(file, channel, this.start + position, end, 
> true);
> }
> {code}
> The `size` parameter here is typically derived from the fetch size, but is 
> upper-bounded with respect to the high watermark. The two calls to 
> `sizeInBytes` here are problematic because the size of the file may change in 
> between them. Specifically a concurrent write may increase the size of the 
> file after the first call to `sizeInBytes` but before the second one. In the 
> worst case, when `size` defines the limit of the high watermark, this can 
> lead to a slice containing uncommitted data.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9835) Race condition with concurrent write allows reads above high watermark

2020-04-08 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078576#comment-17078576
 ] 

ASF GitHub Bot commented on KAFKA-9835:
---

hachikuji commented on pull request #8451: KAFKA-9835; Protect 
`FileRecords.slice` from concurrent write
URL: https://github.com/apache/kafka/pull/8451
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Race condition with concurrent write allows reads above high watermark
> --
>
> Key: KAFKA-9835
> URL: https://issues.apache.org/jira/browse/KAFKA-9835
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> Kafka's log implementation serializes all writes using a lock, but allows 
> multiple concurrent reads while that lock is held. The `FileRecords` class 
> contains the core implementation. Reads to the log create logical slices of 
> `FileRecords` which are then passed to the network layer for sending. An 
> abridged version of the implementation of `slice` is provided below:
> {code}
> public FileRecords slice(int position, int size) throws IOException {
> int end = this.start + position + size;
> // handle integer overflow or if end is beyond the end of the file
> if (end < 0 || end >= start + sizeInBytes())
> end = start + sizeInBytes();
> return new FileRecords(file, channel, this.start + position, end, 
> true);
> }
> {code}
> The `size` parameter here is typically derived from the fetch size, but is 
> upper-bounded with respect to the high watermark. The two calls to 
> `sizeInBytes` here are problematic because the size of the file may change in 
> between them. Specifically a concurrent write may increase the size of the 
> file after the first call to `sizeInBytes` but before the second one. In the 
> worst case, when `size` defines the limit of the high watermark, this can 
> lead to a slice containing uncommitted data.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9831) Failing test: EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState[exactly_once_beta]

2020-04-08 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078565#comment-17078565
 ] 

Sophie Blee-Goldman commented on KAFKA-9831:


h3. Error Message

java.lang.AssertionError: Expected: <[KeyValue(0, 0), KeyValue(0, 1), 
KeyValue(0, 3), KeyValue(0, 6), KeyValue(0, 10), KeyValue(0, 15), KeyValue(0, 
21), KeyValue(0, 28), KeyValue(0, 36), KeyValue(0, 45), KeyValue(0, 55), 
KeyValue(0, 66), KeyValue(0, 78), KeyValue(0, 91), KeyValue(0, 105)]> but: was 
<[KeyValue(0, 0), KeyValue(0, 1), KeyValue(0, 3), KeyValue(0, 6), KeyValue(0, 
10), KeyValue(0, 15), KeyValue(0, 21), KeyValue(0, 28), KeyValue(0, 36), 
KeyValue(0, 45), KeyValue(0, 55), KeyValue(0, 66), KeyValue(0, 78), KeyValue(0, 
91), KeyValue(0, 105), KeyValue(0, 55), KeyValue(0, 66)]>

> Failing test: 
> EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState[exactly_once_beta]
> --
>
> Key: KAFKA-9831
> URL: https://issues.apache.org/jira/browse/KAFKA-9831
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: John Roesler
>Priority: Major
> Attachments: one.stdout.txt, two.stdout.txt
>
>
> I've seen this fail twice in a row on the same build, but with different 
> errors. Stacktraces follow; stdout is attached.
> One:
> {noformat}
> java.lang.AssertionError: Did not receive all 40 records from topic 
> singlePartitionOutputTopic within 6 ms
> Expected: is a value equal to or greater than <40>
>  but: <39> was less than <40>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:517)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:513)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:491)
>   at 
> org.apache.kafka.streams.integration.EosIntegrationTest.readResult(EosIntegrationTest.java:766)
>   at 
> org.apache.kafka.streams.integration.EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState(EosIntegrationTest.java:473)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at org.junit.runners.Suite.runChild(Suite.java:128)
>   at org.junit.runners.Suite.runChild(Suite.java:27)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at 

[jira] [Commented] (KAFKA-6145) Warm up new KS instances before migrating tasks - potentially a two phase rebalance

2020-04-08 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6145?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078546#comment-17078546
 ] 

ASF GitHub Bot commented on KAFKA-6145:
---

vvcephei commented on pull request #8409: KAFKA-6145: KIP-441 Pt. 6 Trigger 
probing rebalances until group is stable
URL: https://github.com/apache/kafka/pull/8409
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Warm up new KS instances before migrating tasks - potentially a two phase 
> rebalance
> ---
>
> Key: KAFKA-6145
> URL: https://issues.apache.org/jira/browse/KAFKA-6145
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Antony Stubbs
>Assignee: Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
>
> Currently when expanding the KS cluster, the new node's partitions will be 
> unavailable during the rebalance, which for large states can take a very long 
> time, or for small state stores even more than a few ms can be a deal breaker 
> for micro service use cases.
> One workaround would be two execute the rebalance in two phases:
> 1) start running state store building on the new node
> 2) once the state store is fully populated on the new node, only then 
> rebalance the tasks - there will still be a rebalance pause, but would be 
> greatly reduced
> Relates to: KAFKA-6144 - Allow state stores to serve stale reads during 
> rebalance



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9835) Race condition with concurrent write allows reads above high watermark

2020-04-08 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078424#comment-17078424
 ] 

ASF GitHub Bot commented on KAFKA-9835:
---

hachikuji commented on pull request #8451: KAFKA-9835; Protect 
`FileRecords.slice` from concurrent write
URL: https://github.com/apache/kafka/pull/8451
 
 
   If the size of a log changes due to a concurrent write while accessing 
`FileRecords.slice`, then we may exceed the requested size. In particular, this 
would allow a read above the high watermark.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Race condition with concurrent write allows reads above high watermark
> --
>
> Key: KAFKA-9835
> URL: https://issues.apache.org/jira/browse/KAFKA-9835
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> Kafka's log implementation serializes all writes using a lock, but allows 
> multiple concurrent reads while that lock is held. The `FileRecords` class 
> contains the core implementation. Reads to the log create logical slices of 
> `FileRecords` which are then passed to the network layer for sending. An 
> abridged version of the implementation of `slice` is provided below:
> {code}
> public FileRecords slice(int position, int size) throws IOException {
> int end = this.start + position + size;
> // handle integer overflow or if end is beyond the end of the file
> if (end < 0 || end >= start + sizeInBytes())
> end = start + sizeInBytes();
> return new FileRecords(file, channel, this.start + position, end, 
> true);
> }
> {code}
> The `size` parameter here is typically derived from the fetch size, but is 
> upper-bounded with respect to the high watermark. The two calls to 
> `sizeInBytes` here are problematic because the size of the file may change in 
> between them. Specifically a concurrent write may increase the size of the 
> file after the first call to `sizeInBytes` but before the second one. In the 
> worst case, when `size` defines the limit of the high watermark, this can 
> lead to a slice containing uncommitted data.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9592) Safely abort Producer transactions during application shutdown

2020-04-08 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078421#comment-17078421
 ] 

Boyang Chen commented on KAFKA-9592:


[~iamabug] Thanks for the check. I think the code example you posted does not 
have a conflict with our proposed solution right? `abortTransaction` still 
throws fatal exception, so inside the first catch block if `producer.close()` 
could abort transaction internally, that would be good enough IMHO. cc 
[~guozhang]

> Safely abort Producer transactions during application shutdown
> --
>
> Key: KAFKA-9592
> URL: https://issues.apache.org/jira/browse/KAFKA-9592
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 2.5.0
>Reporter: Boyang Chen
>Assignee: Xiang Zhang
>Priority: Major
>  Labels: help-wanted, needs-kip, newbie
> Fix For: 2.6.0
>
>
> Today if a transactional producer hits a fatal exception, the caller usually 
> catches the exception and handle it by closing the producer, and abort the 
> transaction:
>  
> {code:java}
> try {
>   producer.beginTxn();
>   producer.send(xxx);
>   producer.sendOffsets(xxx);
>   producer.commit();
> } catch (ProducerFenced | UnknownPid e) {
>   ...
>   producer.abortTxn();
>   producer.close();
> }{code}
> This is what the current API suggests user to do. Another scenario is during 
> an informed shutdown, people with EOS producer would also like to end an 
> ongoing transaction before closing the producer as it sounds more clean.
> The tricky scenario is that `abortTxn` is not a safe call when the producer 
> is already in an error state, which means user has to do another try-catch 
> with the first layer catch block, making the error handling pretty annoying. 
> There are several ways to make this API robust and guide user to a safe usage:
>  # Internally abort any ongoing transaction within `producer.close`, and 
> comment on `abortTxn` call to warn user not to do it manually. 
>  # Similar to 1, but get a new `close(boolean abortTxn)` API call in case 
> some users want to handle transaction state by themselves.
>  # Introduce a new abort transaction API with a boolean flag indicating 
> whether the producer is in error state, instead of throwing exceptions
>  # Introduce a public API `isInError` on producer for user to validate before 
> doing any transactional API calls
> I personally favor 1 & 2 most as it is simple and does not require any API 
> change. Considering the change scope, I would still recommend a small KIP.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9724) Consumer wrongly ignores fetched records "since it no longer has valid position"

2020-04-08 Thread David Arthur (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078417#comment-17078417
 ] 

David Arthur commented on KAFKA-9724:
-

[~o.muravskiy], I have a patch available here 
https://github.com/apache/kafka/pull/8376. Would you be willing to try it out 
and see if you continue to see hanging in the consumer? 

> Consumer wrongly ignores fetched records "since it no longer has valid 
> position"
> 
>
> Key: KAFKA-9724
> URL: https://issues.apache.org/jira/browse/KAFKA-9724
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 2.4.0
>Reporter: Oleg Muravskiy
>Priority: Major
> Attachments: consumer.log.xz
>
>
> After upgrading kafka-client to 2.4.0 (while brokers are still at 2.2.0) 
> consumers in a consumer group intermittently stop progressing on assigned 
> partitions, even when there are messages to consume. This is not a permanent 
> condition, as they progress from time to time, but become slower with time, 
> and catch up after restart.
> Here is a sample of 3 consecutive ignored fetches:
> {noformat}
> 2020-03-15 12:08:58,440 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - 
> Committed offset 538065584 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,541 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Skipping validation of fetch offsets for partitions [mrt-rrc10-1, 
> mrt-rrc10-6, mrt-rrc22-7] since the broker does not support the required 
> protocol version (introduced in Kafka 2.3)
> 2020-03-15 12:08:58,549 DEBUG [Thread-6] org.apache.kafka.clients.Metadata - 
> Updating last seen epoch from null to 62 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,557 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - 
> Committed offset 538065584 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,652 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Skipping validation of fetch offsets for partitions [mrt-rrc10-1, 
> mrt-rrc10-6, mrt-rrc22-7] since the broker does not support the required 
> protocol version (introduced in Kafka 2.3)
> 2020-03-15 12:08:58,659 DEBUG [Thread-6] org.apache.kafka.clients.Metadata - 
> Updating last seen epoch from null to 62 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,659 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Fetch READ_UNCOMMITTED at offset 538065584 for partition mrt-rrc10-6 returned 
> fetch data (error=NONE, highWaterMark=538065631, lastStableOffset = 
> 538065631, logStartOffset = 485284547, preferredReadReplica = absent, 
> abortedTransactions = null, recordsSizeInBytes=16380)
> 2020-03-15 12:08:58,659 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Ignoring fetched records for partition mrt-rrc10-6 since it no longer has 
> valid position
> 2020-03-15 12:08:58,665 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - 
> Committed offset 538065584 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,761 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Skipping validation of fetch offsets for partitions [mrt-rrc10-1, 
> mrt-rrc10-6, mrt-rrc22-7] since the broker does not support the required 
> protocol version (introduced in Kafka 2.3)
> 2020-03-15 12:08:58,761 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Added READ_UNCOMMITTED fetch request for partition mrt-rrc10-6 at position 
> FetchPosition{offset=538065584, offsetEpoch=Optional[62], 
> currentLeader=LeaderAndEpoch{leader=node03.kafka:9092 (id: 3 rack: null), 
> epoch=-1}} to node node03.kafka:9092 (id: 3 rack: null)
> 2020-03-15 12:08:58,761 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), 
> implied=(mrt-rrc10-6, mrt-rrc22-7, mrt-rrc10-1)) to broker node03.kafka:9092 
> (id: 3 rack: null)
> 2020-03-15 12:08:58,770 DEBUG [Thread-6] org.apache.kafka.clients.Metadata - 
> Updating last seen epoch from null to 62 for partition mrt-rrc10-6
> 2020-03-15 12:08:58,770 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Fetch READ_UNCOMMITTED at offset 538065584 for partition mrt-rrc10-6 returned 
> fetch data (error=NONE, highWaterMark=538065727, lastStableOffset = 
> 538065727, logStartOffset = 485284547, preferredReadReplica = absent, 
> abortedTransactions = null, recordsSizeInBytes=51864)
> 2020-03-15 12:08:58,770 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Ignoring fetched records for partition mrt-rrc10-6 since it no longer has 
> valid position
> 2020-03-15 12:08:58,808 DEBUG [Thread-6] o.a.k.c.c.i.ConsumerCoordinator - 
> Committed offset 538065584 for partition mrt-rrc10-6
> {noformat}
> After which consumer makes progress:
> {noformat}
> 2020-03-15 12:08:58,871 DEBUG [Thread-6] o.a.k.c.consumer.internals.Fetcher - 
> Skipping validation of fetch 

[jira] [Commented] (KAFKA-7613) Enable javac rawtypes, serial and try xlint warnings

2020-04-08 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078360#comment-17078360
 ] 

ASF GitHub Bot commented on KAFKA-7613:
---

tombentley commented on pull request #8450: KAFKA-7613: Enable -Xlint:rawtypes 
in clients, fixing warnings
URL: https://github.com/apache/kafka/pull/8450
 
 
   Fix all existing javac warnings about use of raw types in clients add 
-Xlint:rawtypes to the clients the javac options in build.gradle. This 
addresses part of KAFKA-7613, but further work will be needed for the other 
warnings.
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable javac rawtypes, serial and try xlint warnings
> 
>
> Key: KAFKA-7613
> URL: https://issues.apache.org/jira/browse/KAFKA-7613
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Tom Bentley
>Priority: Major
>
> KAFKA-7612 enables all Xlint warnings apart from the following:
> {code:java}
> options.compilerArgs << "-Xlint:-rawtypes"
> options.compilerArgs << "-Xlint:-serial"
> options.compilerArgs << "-Xlint:-try"{code}
> We should fix the issues and enable the warnings.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7613) Enable javac rawtypes, serial and try xlint warnings

2020-04-08 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078330#comment-17078330
 ] 

ASF GitHub Bot commented on KAFKA-7613:
---

tombentley commented on pull request #8449: KAFKA-7613: Enable -Xlint:try, 
fixing warnings
URL: https://github.com/apache/kafka/pull/8449
 
 
   Fix all existing javac warnings about try statements and remove 
`-Xlint:-try` from the javac options in build.gradle. This addresses part of 
KAFKA-7613, but further work will be needed for the other warnings.
   
   A couple of `@SuppressWarnings("try")` were needed to avoid having to 
override `close()` on interfaces extending `AutoCloseable`. The problem with 
those is `AutoCloseable.close()` is declared to throw `Exception`, which allows 
`InterruptedException`, which generates a warning.
   
   try-with-resources with an ignored variable causes a warning, and I favoured 
rewriting these rather than suppressing the warning. 
   
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable javac rawtypes, serial and try xlint warnings
> 
>
> Key: KAFKA-7613
> URL: https://issues.apache.org/jira/browse/KAFKA-7613
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Tom Bentley
>Priority: Major
>
> KAFKA-7612 enables all Xlint warnings apart from the following:
> {code:java}
> options.compilerArgs << "-Xlint:-rawtypes"
> options.compilerArgs << "-Xlint:-serial"
> options.compilerArgs << "-Xlint:-try"{code}
> We should fix the issues and enable the warnings.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8890) KIP-519: Make SSL context/engine configuration extensible

2020-04-08 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-8890.
---
Fix Version/s: 2.6.0
   Resolution: Fixed

> KIP-519: Make SSL context/engine configuration extensible
> -
>
> Key: KAFKA-8890
> URL: https://issues.apache.org/jira/browse/KAFKA-8890
> Project: Kafka
>  Issue Type: New Feature
>  Components: security
>Reporter: Maulin Vasavada
>Priority: Minor
> Fix For: 2.6.0
>
>
> This is to track changes for KIP-519: Make SSL context/engine configuration 
> extensible 
> ([https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=128650952])



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8890) KIP-519: Make SSL context/engine configuration extensible

2020-04-08 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078324#comment-17078324
 ] 

ASF GitHub Bot commented on KAFKA-8890:
---

rajinisivaram commented on pull request #8338: KAFKA-8890: KIP-519- Make SSL 
context/engine configuration extensible
URL: https://github.com/apache/kafka/pull/8338
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KIP-519: Make SSL context/engine configuration extensible
> -
>
> Key: KAFKA-8890
> URL: https://issues.apache.org/jira/browse/KAFKA-8890
> Project: Kafka
>  Issue Type: New Feature
>  Components: security
>Reporter: Maulin Vasavada
>Priority: Minor
>
> This is to track changes for KIP-519: Make SSL context/engine configuration 
> extensible 
> ([https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=128650952])



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9837) New RPC for notifying controller of failed replica

2020-04-08 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-9837:

Description: This is the tracking ticket for 
[KIP-589|https://cwiki.apache.org/confluence/display/KAFKA/KIP-589+Add+API+to+update+Replica+state+in+Controller].
 For the bridge release, brokers should no longer use ZooKeeper to notify the 
controller that a log dir has failed. It should instead use an RPC mechanism.  
(was: This is the tracking ticket for KIP-589. For the bridge release, brokers 
should no longer use ZooKeeper to notify the controller that a log dir has 
failed. It should instead use an RPC mechanism.)

> New RPC for notifying controller of failed replica
> --
>
> Key: KAFKA-9837
> URL: https://issues.apache.org/jira/browse/KAFKA-9837
> Project: Kafka
>  Issue Type: Sub-task
>  Components: controller, core
>Reporter: David Arthur
>Priority: Major
> Fix For: 2.6.0
>
>
> This is the tracking ticket for 
> [KIP-589|https://cwiki.apache.org/confluence/display/KAFKA/KIP-589+Add+API+to+update+Replica+state+in+Controller].
>  For the bridge release, brokers should no longer use ZooKeeper to notify the 
> controller that a log dir has failed. It should instead use an RPC mechanism.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9837) New RPC for notifying controller of failed replica

2020-04-08 Thread David Arthur (Jira)
David Arthur created KAFKA-9837:
---

 Summary: New RPC for notifying controller of failed replica
 Key: KAFKA-9837
 URL: https://issues.apache.org/jira/browse/KAFKA-9837
 Project: Kafka
  Issue Type: Sub-task
  Components: controller, core
Reporter: David Arthur
 Fix For: 2.6.0


This is the tracking ticket for KIP-589. For the bridge release, brokers should 
no longer use ZooKeeper to notify the controller that a log dir has failed. It 
should instead use an RPC mechanism.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9829) Kafka brokers are unregistered on Zookeeper node replacement

2020-04-08 Thread Pradeep (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078308#comment-17078308
 ] 

Pradeep commented on KAFKA-9829:


Hi [~cricket007] - We are doing high availability testing to validate if the 
Kafka cluster is operational on a complete replacement of Zookeeper nodes. Note 
that we are not replacing all the zookeeper nodes at once. We wait for a 
sufficient amount of time for the Zookeeper nodes to synchronize before 
replacing the other one.

> Kafka brokers are unregistered on Zookeeper node replacement
> 
>
> Key: KAFKA-9829
> URL: https://issues.apache.org/jira/browse/KAFKA-9829
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.2.1
>Reporter: Pradeep
>Priority: Major
>
> We have a Kafka cluster with 3 nodes connected to a Zookeeper (3.4.14) 
> cluster of 3 nodes in AWS. We make use of the auto-scaling group to provision 
> nodes upon failures. We are seeing an issue where the Kafka brokers are 
> getting un-registered when all the Zookeeper nodes are replaced one after the 
> other. Every Zookeeper node is terminated from AWS console and we wait for a 
> replacement node to be provisioned with Zookeeper initialized before 
> terminating the other node.
> On every Zookeeper node replacement, the /broker/ids path show all the Kafka 
> broker ids in the cluster. But only on the final Zookeeper node replacement, 
> the content in /broker/ids become empty. Because of this issue we are not 
> able to create any new topic or do any other operations.
> We are seeing below logs in one of the replaced Zookeeper nodes when all of 
> the original nodes are replaced.
> {{2020-03-26 20:29:20,303 [myid:3] - INFO 
> [[SessionTracker:ZooKeeperServer@355|sessiontracker:ZooKeeperServer@355]] - 
> Expiring session 0x10003b973b50016, timeout of 6000ms exceeded}}
> {{2020-03-26 20:29:20,303 [myid:3] - INFO 
> [[SessionTracker:ZooKeeperServer@355|sessiontracker:ZooKeeperServer@355]] - 
> Expiring session 0x10003b973b5000e, timeout of 6000ms exceeded}}
> {{2020-03-26 20:29:20,303 [myid:3] - INFO 
> [[SessionTracker:ZooKeeperServer@355|sessiontracker:ZooKeeperServer@355]] - 
> Expiring session 0x30003a126690002, timeout of 6000ms exceeded}}
> {{2020-03-26 20:29:20,307 [myid:3] - DEBUG [CommitProcessor:3:DataTree@893] - 
> Deleting ephemeral node /brokers/ids/1002 for session 0x10003b973b50016}}
> {{2020-03-26 20:29:20,307 [myid:3] - DEBUG [CommitProcessor:3:DataTree@893] - 
> Deleting ephemeral node /brokers/ids/1003 for session 0x10003b973b5000e}}
> {{2020-03-26 20:29:20,307 [myid:3] - DEBUG [CommitProcessor:3:DataTree@893] - 
> Deleting ephemeral node /controller for session 0x30003a126690002}}
> {{2020-03-26 20:29:20,307 [myid:3] - DEBUG [CommitProcessor:3:DataTree@893] - 
> Deleting ephemeral node /brokers/ids/1001 for session 0x30003a126690002}}
>  
> I am not sure if the issue is related to KAFKA-5473.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9664) Flaky Test KafkaStreamsTest#testStateThreadClose

2020-04-08 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078293#comment-17078293
 ] 

Bruno Cadonna commented on KAFKA-9664:
--

[~ableegoldman] Where did this flaky test happen? I ask because I had a look 
into it and the line numbers do not match with 2.5 and the timeout was changed 
from 1 ms in January 2019 and it has been never changed back again. It 
seems like this failure comes from a time when 
{{KafkaStreamsTest#testStateThreadClose()}} still was a test that used embedded 
Kafka.

> Flaky Test KafkaStreamsTest#testStateThreadClose
> 
>
> Key: KAFKA-9664
> URL: https://issues.apache.org/jira/browse/KAFKA-9664
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Sophie Blee-Goldman
>Priority: Major
>  Labels: flaky-test
>
> org.apache.kafka.streams.KafkaStreamsTest > testStateThreadClose 
> FAILED*14:23:21*  java.lang.AssertionError: Condition not met within 
> timeout 1. Thread never stopped.*14:23:21*  at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:278)*14:23:21*
>   at 
> org.apache.kafka.streams.KafkaStreamsTest.testStateThreadClose(KafkaStreamsTest.java:204)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9796) Broker shutdown could be stuck forever under certain conditions

2020-04-08 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078228#comment-17078228
 ] 

ASF GitHub Bot commented on KAFKA-9796:
---

dajac commented on pull request #8448: [WIP] KAFKA-9796; Broker shutdown could 
be stuck forever under certain conditions
URL: https://github.com/apache/kafka/pull/8448
 
 
   This patch reworks the SocketServer to always start the acceptor threads 
after the processor threads and to always stop the processor threads before the 
acceptor threads. It also ensure that the processor threads drain its 
newConnection queue to unblock acceptors that may be waiting. However, the 
acceptors still bind during the startup, only the processing is further delayed.
   
   The flow looks like this now:
   
   ```
   val socketServer = ...
   
   socketServer.startup(startProcessingRequests = false)
   // Acceptors are bound.
   
   socketServer.startProcessingRequests(authorizerFutures)
   // Acceptors and Processors process new connections and requests
   
   socketServer.stopProcessingRequests()
   // Acceptors and Processors are stopped
   
   socketServer.shutdown()
   // SocketServer is shutdown (metrics, etc.)
   ```
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Broker shutdown could be stuck forever under certain conditions
> ---
>
> Key: KAFKA-9796
> URL: https://issues.apache.org/jira/browse/KAFKA-9796
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>
> During the broker initialisation, the Acceptor threads are started early to 
> know the bound port and delays starting the processors to the end of the 
> initialisation sequence. We have found out that the shutdown of a broker 
> could be stuck forever under the following conditions:
>  - the shutdown procedure is started before the processors are started;
>  - the `newConnections` queues of the processors are full; and
>  - an extra new connection has been accepted but can't be queued up in a 
> processor.
> For instance, this could happen if a `NodeExistsException` is raised when the 
> broker tries to register itself in ZK.
> When the above conditions happens, the shutting down triggers the shutdown of 
> the acceptor threads and waits until they are (first thread dump bellow). If 
> an acceptor as a pending connection which can't be queued up in a processor, 
> it ends up waiting until space is made is new queue to accept the new 
> connection (second thread dump bellow). As the processors are not started, 
> the new connection queues are not drained so it never releases the acceptor 
> thread.
> *Shutdown wait on acceptor to shutdown*
> {noformat}
> "main" #1 prio=5 os_prio=0 cpu=3626.89ms elapsed=106360.56s 
> tid=0x7f625001c800 nid=0x272 waiting on condition  [0x7f6257ca4000]
>java.lang.Thread.State: WAITING (parking)
>   at jdk.internal.misc.Unsafe.park(java.base@11.0.5/Native Method)
>   - parking to wait for  <0x000689a61800> (a 
> java.util.concurrent.CountDownLatch$Sync)
>   at 
> java.util.concurrent.locks.LockSupport.park(java.base@11.0.5/LockSupport.java:194)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(java.base@11.0.5/AbstractQueuedSynchronizer.java:885)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(java.base@11.0.5/AbstractQueuedSynchronizer.java:1039)
>   at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(java.base@11.0.5/AbstractQueuedSynchronizer.java:1345)
>   at 
> java.util.concurrent.CountDownLatch.await(java.base@11.0.5/CountDownLatch.java:232)
>   at kafka.network.AbstractServerThread.shutdown(SocketServer.scala:430)
>   at kafka.network.Acceptor.shutdown(SocketServer.scala:521)
>   at 
> kafka.network.SocketServer.$anonfun$stopProcessingRequests$2(SocketServer.scala:267)
>   at 
> kafka.network.SocketServer.$anonfun$stopProcessingRequests$2$adapted(SocketServer.scala:267)
>   at 
> kafka.network.SocketServer$$Lambda$604/0x000840540840.apply(Unknown 
> Source)
>   at scala.collection.Iterator.foreach(Iterator.scala:941)
>   at scala.collection.Iterator.foreach$(Iterator.scala:941)
>   at 

[jira] [Commented] (KAFKA-9458) Kafka crashed in windows environment

2020-04-08 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-9458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078084#comment-17078084
 ] 

Sönke Liebau commented on KAFKA-9458:
-

Hi [~hirik]

first of all, Windows is not a supported platform for Kafka, which probably 
explains, why this bug has been around for so long.
There are a whole bunch of unresolved issues regarding file delete and rename 
behavior on Windows in jira. Most notable is KAFKA-1194 (but there are about 10 
- 15 more at a quick glance).
Have you reviewed the progress and discussions on that ticket?



> Kafka crashed in windows environment
> 
>
> Key: KAFKA-9458
> URL: https://issues.apache.org/jira/browse/KAFKA-9458
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 2.4.0
> Environment: Windows Server 2019
>Reporter: hirik
>Priority: Critical
>  Labels: windows
> Fix For: 2.6.0
>
> Attachments: Windows_crash_fix.patch, logs.zip
>
>
> Hi,
> while I was trying to validate Kafka retention policy, Kafka Server crashed 
> with below exception trace. 
> [2020-01-21 17:10:40,475] INFO [Log partition=test1-3, 
> dir=C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka] 
> Rolled new log segment at offset 1 in 52 ms. (kafka.log.Log)
> [2020-01-21 17:10:40,484] ERROR Error while deleting segments for test1-3 in 
> dir C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka 
> (kafka.server.LogDirFailureChannel)
> java.nio.file.FileSystemException: 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
>  -> 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process.
> at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>  at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
>  at java.base/sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:395)
>  at 
> java.base/sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:292)
>  at java.base/java.nio.file.Files.move(Files.java:1425)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:795)
>  at kafka.log.AbstractIndex.renameTo(AbstractIndex.scala:209)
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:497)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1(Log.scala:2206)
>  at kafka.log.Log.$anonfun$deleteSegmentFiles$1$adapted(Log.scala:2206)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.Log.deleteSegmentFiles(Log.scala:2206)
>  at kafka.log.Log.removeAndDeleteSegments(Log.scala:2191)
>  at kafka.log.Log.$anonfun$deleteSegments$2(Log.scala:1700)
>  at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.scala:17)
>  at kafka.log.Log.maybeHandleIOException(Log.scala:2316)
>  at kafka.log.Log.deleteSegments(Log.scala:1691)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1686)
>  at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1763)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1753)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3(LogManager.scala:982)
>  at kafka.log.LogManager.$anonfun$cleanupLogs$3$adapted(LogManager.scala:979)
>  at scala.collection.immutable.List.foreach(List.scala:305)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:979)
>  at kafka.log.LogManager.$anonfun$startup$2(LogManager.scala:403)
>  at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:116)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:65)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>  at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>  at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>  at java.base/java.lang.Thread.run(Thread.java:830)
>  Suppressed: java.nio.file.FileSystemException: 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex
>  -> 
> C:\Users\Administrator\Downloads\kafka\bin\windows\..\..\data\kafka\test1-3\.timeindex.deleted:
>  The process cannot access the file because it is being used by another 
> process.
> at 
> java.base/sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:92)
>  at 
> java.base/sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:103)
> 

[jira] [Updated] (KAFKA-9836) org.apache.zookeeper.KeeperException$SessionMovedException: KeeperErrorCode = Session moved for /controller_epoch

2020-04-08 Thread Jagadish (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jagadish updated KAFKA-9836:

Description: 
We have 3 node kafka cluster on RHEL.

We are getting following WARN messages on 2 nodes when using console conusmer/ 
console producer 

+Consumer Warning+

[2020-04-08 06:05:02,356] WARN [Consumer clientId=consumer-1, 
groupId=console-consumer-5952] 1 partitions have leader brokers without a 
matching listener, including [DR27SAL_S_EVT_ACT-0] 
(org.apache.kafka.clients.NetworkClient)

+Producer Warning+

[2020-04-08 06:06:39,177] WARN [Producer clientId=console-producer] 2 
partitions have leader brokers without a matching listener, including 
[FirstConsoleTopic-5, FirstConsoleTopic-2] 
(org.apache.kafka.clients.NetworkClient)

)

 

+Got the following errors in one of our Kafka borker's Stage Change log+

 [2020-04-06 20:08:03,063] TRACE [Controller id=2 epoch=21] Received response 
\{error_code=0} for request UPDATE_METADATA with correlation
 id 2 sent to broker hostname.xx.xxx.com:9092 (id: 2 rack: null) 
(state.change.logger)

[2020-04-06 20:08:03,120] ERROR [Controller id=2 epoch=21] Controller 2 epoch 
21 initiated state change of replica 1 for partition __cons
 umer_offsets-22 from ReplicaDeletionIneligible to OfflineReplica failed 
(state.change.logger)
 org.apache.zookeeper.KeeperException$SessionMovedException: KeeperErrorCode = 
Session moved for /controller_epoch
 at org.apache.zookeeper.KeeperException.create(KeeperException.java:134)
 at org.apache.zookeeper.KeeperException.create(KeeperException.java:54)
 at 
kafka.zk.KafkaZkClient$.kafka$zk$KafkaZkClient$$unwrapResponseWithControllerEpochCheck(KafkaZkClient.scala:1864)
 at 
kafka.zk.KafkaZkClient.$anonfun$retryRequestsUntilConnected$2(KafkaZkClient.scala:1650)
 at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237)
 at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
 at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
 at scala.collection.TraversableLike.map(TraversableLike.scala:237)
 at scala.collection.TraversableLike.map$(TraversableLike.scala:230)
 at scala.collection.AbstractTraversable.map(Traversable.scala:108)
 at kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1650)
 at kafka.zk.KafkaZkClient.setTopicPartitionStatesRaw(KafkaZkClient.scala:204)
 at kafka.zk.KafkaZkClient.updateLeaderAndIsr(KafkaZkClient.scala:261)
 at 
kafka.controller.ZkReplicaStateMachine.doRemoveReplicasFromIsr(ReplicaStateMachine.scala:318)
 at 
kafka.controller.ZkReplicaStateMachine.removeReplicasFromIsr(ReplicaStateMachine.scala:282)
 at 
kafka.controller.ZkReplicaStateMachine.doHandleStateChanges(ReplicaStateMachine.scala:219)
 at 
kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2(ReplicaStateMachine.scala:111)
 at 
kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2$adapted(ReplicaStateMachine.scala:110)
 at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
 at 
kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:110)
 at kafka.controller.ReplicaStateMachine.startup(ReplicaStateMachine.scala:42)
 at 
kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:268)
 at kafka.controller.KafkaController.elect(KafkaController.scala:1226)
 at kafka.controller.KafkaController.processReelect(KafkaController.scala:1543)
 at kafka.controller.KafkaController.process(KafkaController.scala:1584)
 at kafka.controller.QueuedEvent.process(ControllerEventManager.scala:53)
 at 
kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:137)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
 at 
kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:137)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:89)

 

  was:
We have 3 node kafka cluster on RHEL.

We are getting following WARN messages on 2 nodes when using console conusmer/ 
console producer 

+Consumer Warning+

[2020-04-08 06:05:02,356] WARN [Consumer clientId=consumer-1, 
groupId=console-consumer-5952] 1 partitions have leader brokers without a 
matching listener, including [DR27SAL_S_EVT_ACT-0] 
(org.apache.kafka.clients.NetworkClient)

+Producer Warning+

[2020-04-08 06:06:39,177] WARN [Producer clientId=console-producer] 2 
partitions have leader brokers without a matching listener, including 
[FirstConsoleTopic-5, FirstConsoleTopic-2] 
(org.apache.kafka.clients.NetworkClient)

)

 

+Got the following errors in one of our Kafka borker's Stage Change log+

 [2020-04-06 20:08:03,063] TRACE [Controller id=2 epoch=21] Received response 
\{error_code=0} for request 

[jira] [Commented] (KAFKA-9836) org.apache.zookeeper.KeeperException$SessionMovedException: KeeperErrorCode = Session moved for /controller_epoch

2020-04-08 Thread Jagadish (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17078024#comment-17078024
 ] 

Jagadish commented on KAFKA-9836:
-

Need help in understand what made it get to these errors and how to rectify and 
prevent this issue from happening with out impacting the data in the topics.

> org.apache.zookeeper.KeeperException$SessionMovedException: KeeperErrorCode = 
> Session moved for /controller_epoch
> -
>
> Key: KAFKA-9836
> URL: https://issues.apache.org/jira/browse/KAFKA-9836
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.3.0
>Reporter: Jagadish
>Priority: Critical
> Fix For: 2.3.0
>
>
> We have 3 node kafka cluster on RHEL.
> We are getting following WARN messages on 2 nodes when using console 
> conusmer/ console producer 
> +Consumer Warning+
> [2020-04-08 06:05:02,356] WARN [Consumer clientId=consumer-1, 
> groupId=console-consumer-5952] 1 partitions have leader brokers without a 
> matching listener, including [DR27SAL_S_EVT_ACT-0] 
> (org.apache.kafka.clients.NetworkClient)
> +Producer Warning+
> [2020-04-08 06:06:39,177] WARN [Producer clientId=console-producer] 2 
> partitions have leader brokers without a matching listener, including 
> [FirstConsoleTopic-5, FirstConsoleTopic-2] 
> (org.apache.kafka.clients.NetworkClient)
> )
>  
> +Got the following errors in one of our Kafka borker's Stage Change log+
>  [2020-04-06 20:08:03,063] TRACE [Controller id=2 epoch=21] Received response 
> \{error_code=0} for request UPDATE_METADATA with correlation
> id 2 sent to broker cghcts00946.corporate.ge.com:9092 (id: 2 rack: null) 
> (state.change.logger)
> [2020-04-06 20:08:03,120] ERROR [Controller id=2 epoch=21] Controller 2 epoch 
> 21 initiated state change of replica 1 for partition __cons
>  umer_offsets-22 from ReplicaDeletionIneligible to OfflineReplica failed 
> (state.change.logger)
>  org.apache.zookeeper.KeeperException$SessionMovedException: KeeperErrorCode 
> = Session moved for /controller_epoch
>  at org.apache.zookeeper.KeeperException.create(KeeperException.java:134)
>  at org.apache.zookeeper.KeeperException.create(KeeperException.java:54)
>  at 
> kafka.zk.KafkaZkClient$.kafka$zk$KafkaZkClient$$unwrapResponseWithControllerEpochCheck(KafkaZkClient.scala:1864)
>  at 
> kafka.zk.KafkaZkClient.$anonfun$retryRequestsUntilConnected$2(KafkaZkClient.scala:1650)
>  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237)
>  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>  at scala.collection.TraversableLike.map(TraversableLike.scala:237)
>  at scala.collection.TraversableLike.map$(TraversableLike.scala:230)
>  at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>  at 
> kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1650)
>  at kafka.zk.KafkaZkClient.setTopicPartitionStatesRaw(KafkaZkClient.scala:204)
>  at kafka.zk.KafkaZkClient.updateLeaderAndIsr(KafkaZkClient.scala:261)
>  at 
> kafka.controller.ZkReplicaStateMachine.doRemoveReplicasFromIsr(ReplicaStateMachine.scala:318)
>  at 
> kafka.controller.ZkReplicaStateMachine.removeReplicasFromIsr(ReplicaStateMachine.scala:282)
>  at 
> kafka.controller.ZkReplicaStateMachine.doHandleStateChanges(ReplicaStateMachine.scala:219)
>  at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2(ReplicaStateMachine.scala:111)
>  at 
> kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2$adapted(ReplicaStateMachine.scala:110)
>  at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
>  at 
> kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:110)
>  at kafka.controller.ReplicaStateMachine.startup(ReplicaStateMachine.scala:42)
>  at 
> kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:268)
>  at kafka.controller.KafkaController.elect(KafkaController.scala:1226)
>  at 
> kafka.controller.KafkaController.processReelect(KafkaController.scala:1543)
>  at kafka.controller.KafkaController.process(KafkaController.scala:1584)
>  at kafka.controller.QueuedEvent.process(ControllerEventManager.scala:53)
>  at 
> kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:137)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
>  at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
>  at 
> kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:137)
>  at 

[jira] [Updated] (KAFKA-9836) org.apache.zookeeper.KeeperException$SessionMovedException: KeeperErrorCode = Session moved for /controller_epoch

2020-04-08 Thread Jagadish (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jagadish updated KAFKA-9836:

Description: 
We have 3 node kafka cluster on RHEL.

We are getting following WARN messages on 2 nodes when using console conusmer/ 
console producer 

+Consumer Warning+

[2020-04-08 06:05:02,356] WARN [Consumer clientId=consumer-1, 
groupId=console-consumer-5952] 1 partitions have leader brokers without a 
matching listener, including [DR27SAL_S_EVT_ACT-0] 
(org.apache.kafka.clients.NetworkClient)

+Producer Warning+

[2020-04-08 06:06:39,177] WARN [Producer clientId=console-producer] 2 
partitions have leader brokers without a matching listener, including 
[FirstConsoleTopic-5, FirstConsoleTopic-2] 
(org.apache.kafka.clients.NetworkClient)

)

 

+Got the following errors in one of our Kafka borker's Stage Change log+

 [2020-04-06 20:08:03,063] TRACE [Controller id=2 epoch=21] Received response 
\{error_code=0} for request UPDATE_METADATA with correlation
id 2 sent to broker cghcts00946.corporate.ge.com:9092 (id: 2 rack: null) 
(state.change.logger)

[2020-04-06 20:08:03,120] ERROR [Controller id=2 epoch=21] Controller 2 epoch 
21 initiated state change of replica 1 for partition __cons
 umer_offsets-22 from ReplicaDeletionIneligible to OfflineReplica failed 
(state.change.logger)
 org.apache.zookeeper.KeeperException$SessionMovedException: KeeperErrorCode = 
Session moved for /controller_epoch
 at org.apache.zookeeper.KeeperException.create(KeeperException.java:134)
 at org.apache.zookeeper.KeeperException.create(KeeperException.java:54)
 at 
kafka.zk.KafkaZkClient$.kafka$zk$KafkaZkClient$$unwrapResponseWithControllerEpochCheck(KafkaZkClient.scala:1864)
 at 
kafka.zk.KafkaZkClient.$anonfun$retryRequestsUntilConnected$2(KafkaZkClient.scala:1650)
 at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237)
 at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
 at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
 at scala.collection.TraversableLike.map(TraversableLike.scala:237)
 at scala.collection.TraversableLike.map$(TraversableLike.scala:230)
 at scala.collection.AbstractTraversable.map(Traversable.scala:108)
 at kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1650)
 at kafka.zk.KafkaZkClient.setTopicPartitionStatesRaw(KafkaZkClient.scala:204)
 at kafka.zk.KafkaZkClient.updateLeaderAndIsr(KafkaZkClient.scala:261)
 at 
kafka.controller.ZkReplicaStateMachine.doRemoveReplicasFromIsr(ReplicaStateMachine.scala:318)
 at 
kafka.controller.ZkReplicaStateMachine.removeReplicasFromIsr(ReplicaStateMachine.scala:282)
 at 
kafka.controller.ZkReplicaStateMachine.doHandleStateChanges(ReplicaStateMachine.scala:219)
 at 
kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2(ReplicaStateMachine.scala:111)
 at 
kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2$adapted(ReplicaStateMachine.scala:110)
 at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
 at 
kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:110)
 at kafka.controller.ReplicaStateMachine.startup(ReplicaStateMachine.scala:42)
 at 
kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:268)
 at kafka.controller.KafkaController.elect(KafkaController.scala:1226)
 at kafka.controller.KafkaController.processReelect(KafkaController.scala:1543)
 at kafka.controller.KafkaController.process(KafkaController.scala:1584)
 at kafka.controller.QueuedEvent.process(ControllerEventManager.scala:53)
 at 
kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:137)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
 at 
kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:137)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:89)

 

  was:
We have 3 node kafka cluster on RHEL.

We are getting following WARN messages on 2 nodes when using console conusmer/ 
console producer 

+Consumer Warning+

[2020-04-08 06:05:02,356] WARN [Consumer clientId=consumer-1, 
groupId=console-consumer-5952] 1 partitions have leader brokers without a 
matching listener, including [DR27SAL_S_EVT_ACT-0] 
(org.apache.kafka.clients.NetworkClient)

+Producer Warning+

[2020-04-08 06:06:39,177] WARN [Producer clientId=console-producer] 2 
partitions have leader brokers without a matching listener, including 
[FirstConsoleTopic-5, FirstConsoleTopic-2] 
(org.apache.kafka.clients.NetworkClient)

)

 

+Got the following in one of our Kafka borker's Stage Change log+

 

[2020-04-06 20:08:03,120] ERROR [Controller id=2 epoch=21] Controller 2 epoch 
21 initiated state change 

[jira] [Updated] (KAFKA-9836) org.apache.zookeeper.KeeperException$SessionMovedException: KeeperErrorCode = Session moved for /controller_epoch

2020-04-08 Thread Jagadish (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9836?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jagadish updated KAFKA-9836:

Description: 
We have 3 node kafka cluster on RHEL.

We are getting following WARN messages on 2 nodes when using console conusmer/ 
console producer 

+Consumer Warning+

[2020-04-08 06:05:02,356] WARN [Consumer clientId=consumer-1, 
groupId=console-consumer-5952] 1 partitions have leader brokers without a 
matching listener, including [DR27SAL_S_EVT_ACT-0] 
(org.apache.kafka.clients.NetworkClient)

+Producer Warning+

[2020-04-08 06:06:39,177] WARN [Producer clientId=console-producer] 2 
partitions have leader brokers without a matching listener, including 
[FirstConsoleTopic-5, FirstConsoleTopic-2] 
(org.apache.kafka.clients.NetworkClient)

)

 

+Got the following in one of our Kafka borker's Stage Change log+

 

[2020-04-06 20:08:03,120] ERROR [Controller id=2 epoch=21] Controller 2 epoch 
21 initiated state change of replica 1 for partition __cons
umer_offsets-22 from ReplicaDeletionIneligible to OfflineReplica failed 
(state.change.logger)
org.apache.zookeeper.KeeperException$SessionMovedException: KeeperErrorCode = 
Session moved for /controller_epoch
 at org.apache.zookeeper.KeeperException.create(KeeperException.java:134)
 at org.apache.zookeeper.KeeperException.create(KeeperException.java:54)
 at 
kafka.zk.KafkaZkClient$.kafka$zk$KafkaZkClient$$unwrapResponseWithControllerEpochCheck(KafkaZkClient.scala:1864)
 at 
kafka.zk.KafkaZkClient.$anonfun$retryRequestsUntilConnected$2(KafkaZkClient.scala:1650)
 at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237)
 at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
 at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
 at scala.collection.TraversableLike.map(TraversableLike.scala:237)
 at scala.collection.TraversableLike.map$(TraversableLike.scala:230)
 at scala.collection.AbstractTraversable.map(Traversable.scala:108)
 at kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1650)
 at kafka.zk.KafkaZkClient.setTopicPartitionStatesRaw(KafkaZkClient.scala:204)
 at kafka.zk.KafkaZkClient.updateLeaderAndIsr(KafkaZkClient.scala:261)
 at 
kafka.controller.ZkReplicaStateMachine.doRemoveReplicasFromIsr(ReplicaStateMachine.scala:318)
 at 
kafka.controller.ZkReplicaStateMachine.removeReplicasFromIsr(ReplicaStateMachine.scala:282)
 at 
kafka.controller.ZkReplicaStateMachine.doHandleStateChanges(ReplicaStateMachine.scala:219)
 at 
kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2(ReplicaStateMachine.scala:111)
 at 
kafka.controller.ZkReplicaStateMachine.$anonfun$handleStateChanges$2$adapted(ReplicaStateMachine.scala:110)
 at scala.collection.immutable.Map$Map1.foreach(Map.scala:128)
 at 
kafka.controller.ZkReplicaStateMachine.handleStateChanges(ReplicaStateMachine.scala:110)
 at kafka.controller.ReplicaStateMachine.startup(ReplicaStateMachine.scala:42)
 at 
kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:268)
 at kafka.controller.KafkaController.elect(KafkaController.scala:1226)
 at kafka.controller.KafkaController.processReelect(KafkaController.scala:1543)
 at kafka.controller.KafkaController.process(KafkaController.scala:1584)
 at kafka.controller.QueuedEvent.process(ControllerEventManager.scala:53)
 at 
kafka.controller.ControllerEventManager$ControllerEventThread.$anonfun$doWork$1(ControllerEventManager.scala:137)
 at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
 at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:31)
 at 
kafka.controller.ControllerEventManager$ControllerEventThread.doWork(ControllerEventManager.scala:137)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:89)

 

> org.apache.zookeeper.KeeperException$SessionMovedException: KeeperErrorCode = 
> Session moved for /controller_epoch
> -
>
> Key: KAFKA-9836
> URL: https://issues.apache.org/jira/browse/KAFKA-9836
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.3.0
>Reporter: Jagadish
>Priority: Critical
> Fix For: 2.3.0
>
>
> We have 3 node kafka cluster on RHEL.
> We are getting following WARN messages on 2 nodes when using console 
> conusmer/ console producer 
> +Consumer Warning+
> [2020-04-08 06:05:02,356] WARN [Consumer clientId=consumer-1, 
> groupId=console-consumer-5952] 1 partitions have leader brokers without a 
> matching listener, including [DR27SAL_S_EVT_ACT-0] 
> (org.apache.kafka.clients.NetworkClient)
> +Producer Warning+
> [2020-04-08 06:06:39,177] WARN [Producer clientId=console-producer] 2 
> partitions have 

[jira] [Created] (KAFKA-9836) org.apache.zookeeper.KeeperException$SessionMovedException: KeeperErrorCode = Session moved for /controller_epoch

2020-04-08 Thread Jagadish (Jira)
Jagadish created KAFKA-9836:
---

 Summary: 
org.apache.zookeeper.KeeperException$SessionMovedException: KeeperErrorCode = 
Session moved for /controller_epoch
 Key: KAFKA-9836
 URL: https://issues.apache.org/jira/browse/KAFKA-9836
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 2.3.0
Reporter: Jagadish
 Fix For: 2.3.0






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7613) Enable javac rawtypes, serial and try xlint warnings

2020-04-08 Thread Tom Bentley (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17077980#comment-17077980
 ] 

Tom Bentley commented on KAFKA-7613:


[~ijuma] I'll do this over a number of PRs to avoid the code review being very 
burdensome. I assume that where it's not possible to change public interfaces 
(e.g. because they inherit {{AutoCloseable.close()}} which generates a warning 
about it possibly throwing {{InterruptedException}}), that it's OK to suppress 
those warning without a KIP?

> Enable javac rawtypes, serial and try xlint warnings
> 
>
> Key: KAFKA-7613
> URL: https://issues.apache.org/jira/browse/KAFKA-7613
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Tom Bentley
>Priority: Major
>
> KAFKA-7612 enables all Xlint warnings apart from the following:
> {code:java}
> options.compilerArgs << "-Xlint:-rawtypes"
> options.compilerArgs << "-Xlint:-serial"
> options.compilerArgs << "-Xlint:-try"{code}
> We should fix the issues and enable the warnings.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9813) __consumer_offsets loaded cost very long time

2020-04-08 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17077968#comment-17077968
 ] 

leibo commented on KAFKA-9813:
--

[~cricket007] 

it's on a production environment and i could not access now.

After i restart kafka cluster, it recovery to normal.

there was no exception found in server.log

Similar to  kafka-9065.

> __consumer_offsets loaded cost very long time
> -
>
> Key: KAFKA-9813
> URL: https://issues.apache.org/jira/browse/KAFKA-9813
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Major
> Attachments: 2020-04-03_163556.png
>
>
> __consumer_offsets loaded with a long time.
>  
> After I restart kafka(3 HA), one of __consumer_offsets loaded with  41256909 
> ms (about 11 hours),  the others __consumer_offsets partition that load after 
> __consumer_offsets-14 can not be loaded , that lead many consumer can not 
> commit offsets and consume.
> restart time:  2020-04-02 19:06
> __consumer_offsets-14 loaded time: 2020-04-03 06:32,  41256909 ms
> other info: 
>     1.  there are 72408 consumer groups in kafka cluster, and most of it 
> (70498) are empty.
>     2. there was no exception can be found in server.log
>    3. the disk i/o and cpu ,memory load are very low.
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-7613) Enable javac rawtypes, serial and try xlint warnings

2020-04-08 Thread Tom Bentley (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Bentley reassigned KAFKA-7613:
--

Assignee: Tom Bentley

> Enable javac rawtypes, serial and try xlint warnings
> 
>
> Key: KAFKA-7613
> URL: https://issues.apache.org/jira/browse/KAFKA-7613
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Tom Bentley
>Priority: Major
>
> KAFKA-7612 enables all Xlint warnings apart from the following:
> {code:java}
> options.compilerArgs << "-Xlint:-rawtypes"
> options.compilerArgs << "-Xlint:-serial"
> options.compilerArgs << "-Xlint:-try"{code}
> We should fix the issues and enable the warnings.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions

2020-04-08 Thread GEORGE LI (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17077958#comment-17077958
 ] 

GEORGE LI commented on KAFKA-4084:
--

[~blodsbror]

Probably doing PLE with too many partitions at once is not good.  We have 
scripted to take all  partition with Preferred Leader Imbalance.  (e.g.  
current leader != first replica).   and the first replica is in ISR.  

Then we divide it  batches (e.g.  100 partitions per batch. and throttle sleep 
about 5-10 seconds) between each batch.   We also verify each batch after 
submitting for PLE.   e.g.  the ZK node.  
//admin/preferred_replica_election  is gone.  


for KIP-491 patch,  maybe I should write a wrapper for doing PLE,   because now 
the logic is not just.  current_leader != first replica.  but:  current_leader 
!=   


The batch logic is basically  writing the topic/partitions into a Json file 
(e.g. 100 per batch), and the submit that batch using the open source script 
`kafka-preferred-replica-election.sh` ,  below is shell script to do PLE for 
one topic (all partitions).  It's still using ZK to submit the json,  can 
change to --bootstrap-server 


{code}
$ cat topic_preferred_leader_election.sh
.
name=$1
topic=$2
kafka_cluster_name="${name}"
zk=$(kafka_zk_lookup ${kafka_cluster_name})
json_filename="${name}_${topic}_leader_election.json"

touch ${json_filename}

echo "{\"partitions\":[" >${json_filename}
IFS=$'\n'
for partition in `/usr/lib/kafka/bin/kafka-run-class.sh 
kafka.admin.TopicCommand --zookeeper $zk --describe --topic $topic  2>/dev/null 
|grep Partition:|awk -F "Partition:" '{print $2}'|awk '{print $1}'`
do
  if [ "$partition" == "0" ]
  then
echo " {\"topic\": \"${topic}\", \"partition\": ${partition}}" 
>>${json_filename}
  else
echo ",{\"topic\": \"${topic}\", \"partition\": ${partition}}" 
>>${json_filename}
  fi
done

echo "]}" >>${json_filename}

/usr/lib/kafka/bin/kafka-preferred-replica-election.sh --zookeeper $zk 
--path-to-json-file ${json_filename} 2>/dev/null
#rm ${json_filename}
{code}


for the troubleshooting of the timeout,  maybe check the ZK node:  
//admin/preferred_replica_election and see any pending PLE there. 
maybe because of the KIP-491 Preferred Leader deprioritized/black list?  I 
doubt, because I have tested it worked.   does this PLE work before applying 
KIP-491 patch? 

I think Zookeeper node has size limit of 1MB.   so 5000-6000 partitions doing 
PLE all together in one batch might not work.   How about trying one topic 
first, then try 100 in a batch?

> automated leader rebalance causes replication downtime for clusters with too 
> many partitions
> 
>
> Key: KAFKA-4084
> URL: https://issues.apache.org/jira/browse/KAFKA-4084
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1
>Reporter: Tom Crayford
>Priority: Major
>  Labels: reliability
> Fix For: 1.1.0
>
>
> If you enable {{auto.leader.rebalance.enable}} (which is on by default), and 
> you have a cluster with many partitions, there is a severe amount of 
> replication downtime following a restart. This causes 
> `UnderReplicatedPartitions` to fire, and replication is paused.
> This is because the current automated leader rebalance mechanism changes 
> leaders for *all* imbalanced partitions at once, instead of doing it 
> gradually. This effectively stops all replica fetchers in the cluster 
> (assuming there are enough imbalanced partitions), and restarts them. This 
> can take minutes on busy clusters, during which no replication is happening 
> and user data is at risk. Clients with {{acks=-1}} also see issues at this 
> time, because replication is effectively stalled.
> To quote Todd Palino from the mailing list:
> bq. There is an admin CLI command to trigger the preferred replica election 
> manually. There is also a broker configuration “auto.leader.rebalance.enable” 
> which you can set to have the broker automatically perform the PLE when 
> needed. DO NOT USE THIS OPTION. There are serious performance issues when 
> doing so, especially on larger clusters. It needs some development work that 
> has not been fully identified yet.
> This setting is extremely useful for smaller clusters, but with high 
> partition counts causes the huge issues stated above.
> One potential fix could be adding a new configuration for the number of 
> partitions to do automated leader rebalancing for at once, and *stop* once 
> that number of leader rebalances are in flight, until they're done. There may 
> be better mechanisms, and I'd love to hear if anybody has any ideas.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9592) Safely abort Producer transactions during application shutdown

2020-04-08 Thread Xiang Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17077954#comment-17077954
 ] 

Xiang Zhang commented on KAFKA-9592:


[~bchen225242], [~guozhang] Could you please take a look at my last comment 
when you are available ? Thanks.

> Safely abort Producer transactions during application shutdown
> --
>
> Key: KAFKA-9592
> URL: https://issues.apache.org/jira/browse/KAFKA-9592
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Affects Versions: 2.5.0
>Reporter: Boyang Chen
>Assignee: Xiang Zhang
>Priority: Major
>  Labels: help-wanted, needs-kip, newbie
> Fix For: 2.6.0
>
>
> Today if a transactional producer hits a fatal exception, the caller usually 
> catches the exception and handle it by closing the producer, and abort the 
> transaction:
>  
> {code:java}
> try {
>   producer.beginTxn();
>   producer.send(xxx);
>   producer.sendOffsets(xxx);
>   producer.commit();
> } catch (ProducerFenced | UnknownPid e) {
>   ...
>   producer.abortTxn();
>   producer.close();
> }{code}
> This is what the current API suggests user to do. Another scenario is during 
> an informed shutdown, people with EOS producer would also like to end an 
> ongoing transaction before closing the producer as it sounds more clean.
> The tricky scenario is that `abortTxn` is not a safe call when the producer 
> is already in an error state, which means user has to do another try-catch 
> with the first layer catch block, making the error handling pretty annoying. 
> There are several ways to make this API robust and guide user to a safe usage:
>  # Internally abort any ongoing transaction within `producer.close`, and 
> comment on `abortTxn` call to warn user not to do it manually. 
>  # Similar to 1, but get a new `close(boolean abortTxn)` API call in case 
> some users want to handle transaction state by themselves.
>  # Introduce a new abort transaction API with a boolean flag indicating 
> whether the producer is in error state, instead of throwing exceptions
>  # Introduce a public API `isInError` on producer for user to validate before 
> doing any transactional API calls
> I personally favor 1 & 2 most as it is simple and does not require any API 
> change. Considering the change scope, I would still recommend a small KIP.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9813) __consumer_offsets loaded cost very long time

2020-04-08 Thread Jordan Moore (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17077931#comment-17077931
 ] 

Jordan Moore commented on KAFKA-9813:
-

Also if you could [dump the log 
segments|https://cwiki.apache.org/confluence/display/KAFKA/System+Tools#SystemTools-DumpLogSegment]
 that would be appreciated

> __consumer_offsets loaded cost very long time
> -
>
> Key: KAFKA-9813
> URL: https://issues.apache.org/jira/browse/KAFKA-9813
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Major
> Attachments: 2020-04-03_163556.png
>
>
> __consumer_offsets loaded with a long time.
>  
> After I restart kafka(3 HA), one of __consumer_offsets loaded with  41256909 
> ms (about 11 hours),  the others __consumer_offsets partition that load after 
> __consumer_offsets-14 can not be loaded , that lead many consumer can not 
> commit offsets and consume.
> restart time:  2020-04-02 19:06
> __consumer_offsets-14 loaded time: 2020-04-03 06:32,  41256909 ms
> other info: 
>     1.  there are 72408 consumer groups in kafka cluster, and most of it 
> (70498) are empty.
>     2. there was no exception can be found in server.log
>    3. the disk i/o and cpu ,memory load are very low.
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9813) __consumer_offsets loaded cost very long time

2020-04-08 Thread Jordan Moore (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17077930#comment-17077930
 ] 

Jordan Moore commented on KAFKA-9813:
-

[~lbdai3190]

Thanks, but that wasn't what I was looking for.
{code}du {code}
or
{code}ls -lh  {code}
on the
{code}kafka.log.dirs {code}

> __consumer_offsets loaded cost very long time
> -
>
> Key: KAFKA-9813
> URL: https://issues.apache.org/jira/browse/KAFKA-9813
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Major
> Attachments: 2020-04-03_163556.png
>
>
> __consumer_offsets loaded with a long time.
>  
> After I restart kafka(3 HA), one of __consumer_offsets loaded with  41256909 
> ms (about 11 hours),  the others __consumer_offsets partition that load after 
> __consumer_offsets-14 can not be loaded , that lead many consumer can not 
> commit offsets and consume.
> restart time:  2020-04-02 19:06
> __consumer_offsets-14 loaded time: 2020-04-03 06:32,  41256909 ms
> other info: 
>     1.  there are 72408 consumer groups in kafka cluster, and most of it 
> (70498) are empty.
>     2. there was no exception can be found in server.log
>    3. the disk i/o and cpu ,memory load are very low.
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9825) Kafka protocol BNF format should have some way to display tagged fields

2020-04-08 Thread sivagurunathan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17077903#comment-17077903
 ] 

sivagurunathan commented on KAFKA-9825:
---

I would love to work on this issue. Can you please assign this task to me. 
Thanks!

> Kafka protocol BNF format should have some way to display tagged fields
> ---
>
> Key: KAFKA-9825
> URL: https://issues.apache.org/jira/browse/KAFKA-9825
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin McCabe
>Priority: Major
>  Labels: newbie
>
> The Kafka protocol BNF format should have some way to display tagged fields.  
> But in clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java , 
> there is no special treatment for fields with a tag.  Maybe something like 
> FIELD_NAME<1> (where 1= the tag number) would work.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9804) Extract ConsumerPerform config into one file

2020-04-08 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9804?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17077901#comment-17077901
 ] 

ASF GitHub Bot commented on KAFKA-9804:
---

jiameixie commented on pull request #8446: KAFKA-9804:Extract consumer configs 
out of PerfConfig
URL: https://github.com/apache/kafka/pull/8446
 
 
   Configs for producer has been extracted out of PerfConfig in
   https://github.com/apache/kafka/pull/3613/commits. The remaining
   in PerfConfig are configs for consumer. And ConsumerPerformance
   also has configs for consumer. Separating these configs into two
   classes is not concise. So we can put all configs into class
   ConsumerPerformance.ConsumerPerfConfig.
   
   Change-Id: I38e2fdf7c7930af786e03cbb5d9deb2f4ef50dee
   Signed-off-by: Jiamei Xie 
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extract ConsumerPerform config into one file
> 
>
> Key: KAFKA-9804
> URL: https://issues.apache.org/jira/browse/KAFKA-9804
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: jiamei xie
>Assignee: jiamei xie
>Priority: Major
>
> Configs for producer has been  extracted out of PerfConfig  in 
> https://github.com/apache/kafka/pull/3613/commits. The remaining in 
> PerfConfig are configs for consumer.  And ConsumerPerformance also has 
> configs for consumer.  Separating these configs into two classes is not 
> concise.  So we can put all configs into class 
> ConsumerPerformance.ConsumerPerfConfig.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9835) Race condition with concurrent write allows reads above high watermark

2020-04-08 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-9835:
--

 Summary: Race condition with concurrent write allows reads above 
high watermark
 Key: KAFKA-9835
 URL: https://issues.apache.org/jira/browse/KAFKA-9835
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


Kafka's log implementation serializes all writes using a lock, but allows 
multiple concurrent reads while that lock is held. The `FileRecords` class 
contains the core implementation. Reads to the log create logical slices of 
`FileRecords` which are then passed to the network layer for sending. An 
abridged version of the implementation of `slice` is provided below:

{code}
public FileRecords slice(int position, int size) throws IOException {
int end = this.start + position + size;
// handle integer overflow or if end is beyond the end of the file
if (end < 0 || end >= start + sizeInBytes())
end = start + sizeInBytes();
return new FileRecords(file, channel, this.start + position, end, true);
}
{code}

The `size` parameter here is typically derived from the fetch size, but is 
upper-bounded with respect to the high watermark. The two calls to 
`sizeInBytes` here are problematic because the size of the file may change in 
between them. Specifically a concurrent write may increase the size of the file 
after the first call to `sizeInBytes` but before the second one. In the worst 
case, when `size` defines the limit of the high watermark, this can lead to a 
slice containing uncommitted data.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8172) FileSystemException: The process cannot access the file because it is being used by another process

2020-04-08 Thread Vincent Claeysen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17077888#comment-17077888
 ] 

Vincent Claeysen commented on KAFKA-8172:
-

2.4 has the same problem. How to fix it ?

> FileSystemException: The process cannot access the file because it is being 
> used by another process
> ---
>
> Key: KAFKA-8172
> URL: https://issues.apache.org/jira/browse/KAFKA-8172
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.1, 2.2.0, 2.1.1
> Environment: Windows
>Reporter: Bharat Kondeti
>Priority: Major
> Fix For: 1.1.1, 2.2.0, 2.1.1
>
> Attachments: 
> 0001-Fix-to-close-the-handlers-before-renaming-files-and-.patch
>
>
> Fix to close file handlers before renaming files / directories and open them 
> back if required
> Following are the file renaming scenarios:
>  * Files are renamed to .deleted so they can be deleted
>  * .cleaned files are renamed to .swap as part of Log.replaceSegments flow
>  * .swap files are renamed to original files as part of Log.replaceSegments 
> flow
> Following are the folder renaming scenarios:
>  * When a topic is marked for deletion, folder is renamed
>  * As part of replacing current logs with future logs in LogManager
> In above scenarios, if file handles are not closed, we get file access 
> violation exception
> Idea is to close the logs and file segments before doing a rename and open 
> them back up if required.
> *Segments Deletion Scenario*
> [2018-06-01 17:00:07,566] ERROR Error while deleting segments for test4-1 in 
> dir D:\data\Kafka\kafka-logs (kafka.server.LogDirFailureChannel)
> java.nio.file.FileSystemException: 
> D:\data\Kafka\kafka-logs\test4-1\.log -> 
> D:\data\Kafka\kafka-logs\test4-1\.log.deleted: The 
> process cannot access the file because it is being used by another process.
> at 
> sun.nio.fs.WindowsException.translateToIOException(WindowsException.java:86)
>  at sun.nio.fs.WindowsException.rethrowAsIOException(WindowsException.java:97)
>  at sun.nio.fs.WindowsFileCopy.move(WindowsFileCopy.java:387)
>  at 
> sun.nio.fs.WindowsFileSystemProvider.move(WindowsFileSystemProvider.java:287)
>  at java.nio.file.Files.move(Files.java:1395)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697)
>  at org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212)
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:1601)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:1588)
>  at 
> kafka.log.Log$$anonfun$deleteSegments$1$$anonfun$apply$mcI$sp$1.apply(Log.scala:1170)
>  at 
> kafka.log.Log$$anonfun$deleteSegments$1$$anonfun$apply$mcI$sp$1.apply(Log.scala:1170)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at kafka.log.Log$$anonfun$deleteSegments$1.apply$mcI$sp(Log.scala:1170)
>  at kafka.log.Log$$anonfun$deleteSegments$1.apply(Log.scala:1161)
>  at kafka.log.Log$$anonfun$deleteSegments$1.apply(Log.scala:1161)
>  at kafka.log.Log.maybeHandleIOException(Log.scala:1678)
>  at kafka.log.Log.deleteSegments(Log.scala:1161)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1156)
>  at kafka.log.Log.deleteRetentionMsBreachedSegments(Log.scala:1228)
>  at kafka.log.Log.deleteOldSegments(Log.scala:1222)
>  at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:854)
>  at kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:852)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
>  at scala.collection.immutable.List.foreach(List.scala:392)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:852)
>  at kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:385)
>  at 
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)
>  at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  

[jira] [Commented] (KAFKA-9813) __consumer_offsets loaded cost very long time

2020-04-08 Thread leibo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17077887#comment-17077887
 ] 

leibo commented on KAFKA-9813:
--

[~cricket007]

log.retention.bytes=104857600

log.segment.bytes=104857600

> __consumer_offsets loaded cost very long time
> -
>
> Key: KAFKA-9813
> URL: https://issues.apache.org/jira/browse/KAFKA-9813
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Major
> Attachments: 2020-04-03_163556.png
>
>
> __consumer_offsets loaded with a long time.
>  
> After I restart kafka(3 HA), one of __consumer_offsets loaded with  41256909 
> ms (about 11 hours),  the others __consumer_offsets partition that load after 
> __consumer_offsets-14 can not be loaded , that lead many consumer can not 
> commit offsets and consume.
> restart time:  2020-04-02 19:06
> __consumer_offsets-14 loaded time: 2020-04-03 06:32,  41256909 ms
> other info: 
>     1.  there are 72408 consumer groups in kafka cluster, and most of it 
> (70498) are empty.
>     2. there was no exception can be found in server.log
>    3. the disk i/o and cpu ,memory load are very low.
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9834) Add config to set ZSTD compresson level

2020-04-08 Thread jiamei xie (Jira)
jiamei xie created KAFKA-9834:
-

 Summary: Add config to set ZSTD compresson level
 Key: KAFKA-9834
 URL: https://issues.apache.org/jira/browse/KAFKA-9834
 Project: Kafka
  Issue Type: Bug
Reporter: jiamei xie
Assignee: jiamei xie


It seems kafka use zstd default compression level 3 and doesn't have support 
for setting zstd compression level.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9831) Failing test: EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState[exactly_once_beta]

2020-04-08 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17077864#comment-17077864
 ] 

Matthias J. Sax commented on KAFKA-9831:


The issue seems to be broker side? \cc [~guozhang] WDYT?

> Failing test: 
> EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState[exactly_once_beta]
> --
>
> Key: KAFKA-9831
> URL: https://issues.apache.org/jira/browse/KAFKA-9831
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: John Roesler
>Priority: Major
> Attachments: one.stdout.txt, two.stdout.txt
>
>
> I've seen this fail twice in a row on the same build, but with different 
> errors. Stacktraces follow; stdout is attached.
> One:
> {noformat}
> java.lang.AssertionError: Did not receive all 40 records from topic 
> singlePartitionOutputTopic within 6 ms
> Expected: is a value equal to or greater than <40>
>  but: <39> was less than <40>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:517)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:513)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:491)
>   at 
> org.apache.kafka.streams.integration.EosIntegrationTest.readResult(EosIntegrationTest.java:766)
>   at 
> org.apache.kafka.streams.integration.EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState(EosIntegrationTest.java:473)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at org.junit.runners.Suite.runChild(Suite.java:128)
>   at org.junit.runners.Suite.runChild(Suite.java:27)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
>