[jira] [Commented] (KAFKA-7695) Cannot override StreamsPartitionAssignor in configuration

2018-12-03 Thread Dmitry Buykin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708311#comment-16708311
 ] 

Dmitry Buykin commented on KAFKA-7695:
--

[~guozhang] I want to create my own PartitionAssignor which extends 
StreamsPartitionAssignor.

1) global consumer failing first because it uses RangeAssignor.class by default 
and cannot work with StreamsPartitionAssignor.class. I could avoid this 
exception by setting two properties: 

{{props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
MyStreamsPartitionAssignor.class.getName());}}
{{props.put(consumerPrefix(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
 RangeAssignor.class.getName()); }}

but it doesn't override StreamsPartitionAssignor.class with 
MyStreamsPartitionAssignor.class in StreamThreads

2) global consumer is just a first place where thrown an exception. I need 
custom PartitionAssignor for solving duplication issue with KStream-KStream 
left joins.

> Cannot override StreamsPartitionAssignor in configuration 
> --
>
> Key: KAFKA-7695
> URL: https://issues.apache.org/jira/browse/KAFKA-7695
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Dmitry Buykin
>Priority: Major
>  Labels: configuration
>
> Cannot override StreamsPartitionAssignor by changing property 
> partition.assignment.strategy in KStreams 2.0.1 because the streams are 
> crashing inside KafkaClientSupplier.getGlobalConsumer. This GlobalConsumer 
> works only with RangeAssignor which configured by default.
> Could be reproduced by setting up
> `props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
> StreamsPartitionAssignor.class.getName());`
> For me it looks like a bug.
> Opened a discussion here 
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1543395977453700
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (KAFKA-7703) KafkaConsumer.position may return a wrong offset after "seekToEnd" is called

2018-12-03 Thread Shixiong Zhu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu updated KAFKA-7703:

Comment: was deleted

(was: I also noticed that TopicPartitionState is modified by multiple threads 
without any protection.)

> KafkaConsumer.position may return a wrong offset after "seekToEnd" is called
> 
>
> Key: KAFKA-7703
> URL: https://issues.apache.org/jira/browse/KAFKA-7703
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0
>Reporter: Shixiong Zhu
>Priority: Major
>
> After "seekToEnd" is called, "KafkaConsumer.position" may return a wrong 
> offset set by another reset request.
> Here is a reproducer: 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246
> In this reproducer, "poll(0)" will send an "earliest" request in background. 
> However, after "seekToEnd" is called, due to a race condition in 
> "Fetcher.resetOffsetIfNeeded" (It's not atomic, "seekToEnd" could happen 
> between the check 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R585
>  and the seek 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R605),
>  "KafkaConsumer.position" may return an "earliest" offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7703) KafkaConsumer.position may return a wrong offset after "seekToEnd" is called

2018-12-03 Thread Shixiong Zhu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708272#comment-16708272
 ] 

Shixiong Zhu edited comment on KAFKA-7703 at 12/4/18 7:25 AM:
--

This seems introduced by KAFKA-6397 which moved the offset updating codes to a 
different thread.


was (Author: zsxwing):
This seems introduced by KAFKA-6397 which defers the offset updating.

> KafkaConsumer.position may return a wrong offset after "seekToEnd" is called
> 
>
> Key: KAFKA-7703
> URL: https://issues.apache.org/jira/browse/KAFKA-7703
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0
>Reporter: Shixiong Zhu
>Priority: Major
>
> After "seekToEnd" is called, "KafkaConsumer.position" may return a wrong 
> offset set by another reset request.
> Here is a reproducer: 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246
> In this reproducer, "poll(0)" will send an "earliest" request in background. 
> However, after "seekToEnd" is called, due to a race condition in 
> "Fetcher.resetOffsetIfNeeded" (It's not atomic, "seekToEnd" could happen 
> between the check 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R585
>  and the seek 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R605),
>  "KafkaConsumer.position" may return an "earliest" offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7703) KafkaConsumer.position may return a wrong offset after "seekToEnd" is called

2018-12-03 Thread Shixiong Zhu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708297#comment-16708297
 ] 

Shixiong Zhu commented on KAFKA-7703:
-

I also noticed that TopicPartitionState is modified by multiple threads without 
any protection.

> KafkaConsumer.position may return a wrong offset after "seekToEnd" is called
> 
>
> Key: KAFKA-7703
> URL: https://issues.apache.org/jira/browse/KAFKA-7703
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0
>Reporter: Shixiong Zhu
>Priority: Major
>
> After "seekToEnd" is called, "KafkaConsumer.position" may return a wrong 
> offset set by another reset request.
> Here is a reproducer: 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246
> In this reproducer, "poll(0)" will send an "earliest" request in background. 
> However, after "seekToEnd" is called, due to a race condition in 
> "Fetcher.resetOffsetIfNeeded" (It's not atomic, "seekToEnd" could happen 
> between the check 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R585
>  and the seek 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R605),
>  "KafkaConsumer.position" may return an "earliest" offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7703) KafkaConsumer.position may return a wrong offset after "seekToEnd" is called

2018-12-03 Thread Shixiong Zhu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708272#comment-16708272
 ] 

Shixiong Zhu edited comment on KAFKA-7703 at 12/4/18 7:13 AM:
--

This seems introduced by KAFKA-6397 which defers the offset updating.


was (Author: zsxwing):
This seems introduced by KAFKA-6397 which moved the offset updating codes into 
a different thread.

> KafkaConsumer.position may return a wrong offset after "seekToEnd" is called
> 
>
> Key: KAFKA-7703
> URL: https://issues.apache.org/jira/browse/KAFKA-7703
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0
>Reporter: Shixiong Zhu
>Priority: Major
>
> After "seekToEnd" is called, "KafkaConsumer.position" may return a wrong 
> offset set by another reset request.
> Here is a reproducer: 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246
> In this reproducer, "poll(0)" will send an "earliest" request in background. 
> However, after "seekToEnd" is called, due to a race condition in 
> "Fetcher.resetOffsetIfNeeded" (It's not atomic, "seekToEnd" could happen 
> between the check 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R585
>  and the seek 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R605),
>  "KafkaConsumer.position" may return an "earliest" offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7703) KafkaConsumer.position may return a wrong offset after "seekToEnd" is called

2018-12-03 Thread Shixiong Zhu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708273#comment-16708273
 ] 

Shixiong Zhu commented on KAFKA-7703:
-

I also noticed that TopicPartitionState is modified by multiple threads without 
any protection.

> KafkaConsumer.position may return a wrong offset after "seekToEnd" is called
> 
>
> Key: KAFKA-7703
> URL: https://issues.apache.org/jira/browse/KAFKA-7703
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0
>Reporter: Shixiong Zhu
>Priority: Major
>
> After "seekToEnd" is called, "KafkaConsumer.position" may return a wrong 
> offset set by another reset request.
> Here is a reproducer: 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246
> In this reproducer, "poll(0)" will send an "earliest" request in background. 
> However, after "seekToEnd" is called, due to a race condition in 
> "Fetcher.resetOffsetIfNeeded" (It's not atomic, "seekToEnd" could happen 
> between the check 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R585
>  and the seek 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R605),
>  "KafkaConsumer.position" may return an "earliest" offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7446) Better error message to explain the upper limit of TimeWindow

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708277#comment-16708277
 ] 

ASF GitHub Bot commented on KAFKA-7446:
---

guozhangwang closed pull request #5930: KAFKA-7446: Fix the duration and 
instant validation messages
URL: https://github.com/apache/kafka/pull/5930
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 819732ac6d8..fdeb8845611 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -82,6 +82,7 @@
 
 import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
+import static 
org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
 
 /**
  * A Kafka client that allows for performing continuous computation on input 
coming from one or more input topics and
@@ -919,7 +920,8 @@ public void run() {
  * @throws IllegalArgumentException if {@code timeout} can't be 
represented as {@code long milliseconds}
  */
 public synchronized boolean close(final Duration timeout) throws 
IllegalArgumentException {
-ApiUtils.validateMillisecondDuration(timeout, "timeout");
+final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, 
"timeout");
+ApiUtils.validateMillisecondDuration(timeout, msgPrefix);
 
 final long timeoutMs = timeout.toMillis();
 if (timeoutMs < 0) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java 
b/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java
index e888d7a120b..dd3b691b10c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java
+++ b/streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java
@@ -18,43 +18,57 @@
 
 import java.time.Duration;
 import java.time.Instant;
-import java.util.Objects;
+
+import static java.lang.String.format;
 
 public final class ApiUtils {
+
+private static final String MILLISECOND_VALIDATION_FAIL_MSG_FRMT = 
"Invalid value for parameter \"%s\" (value was: %s). ";
+
 private ApiUtils() {
 }
 
 /**
  * Validates that milliseconds from {@code duration} can be retrieved.
  * @param duration Duration to check.
- * @param name Name of params for an error message.
+ * @param messagePrefix Prefix text for an error message.
  * @return Milliseconds from {@code duration}.
  */
-public static long validateMillisecondDuration(final Duration duration, 
final String name) {
+public static long validateMillisecondDuration(final Duration duration, 
final String messagePrefix) {
 try {
 if (duration == null)
-throw new IllegalArgumentException("[" + 
Objects.toString(name) + "] shouldn't be null.");
+throw new IllegalArgumentException(messagePrefix + "It 
shouldn't be null.");
 
 return duration.toMillis();
 } catch (final ArithmeticException e) {
-throw new IllegalArgumentException("[" + name + "] can't be 
converted to milliseconds. ", e);
+throw new IllegalArgumentException(messagePrefix + "It can't be 
converted to milliseconds.", e);
 }
 }
 
 /**
  * Validates that milliseconds from {@code instant} can be retrieved.
  * @param instant Instant to check.
- * @param name Name of params for an error message.
+ * @param messagePrefix Prefix text for an error message.
  * @return Milliseconds from {@code instant}.
  */
-public static long validateMillisecondInstant(final Instant instant, final 
String name) {
+public static long validateMillisecondInstant(final Instant instant, final 
String messagePrefix) {
 try {
 if (instant == null)
-throw new IllegalArgumentException("[" + name + "] shouldn't 
be null.");
+throw new IllegalArgumentException(messagePrefix + "It 
shouldn't be null.");
 
 return instant.toEpochMilli();
 } catch (final ArithmeticException e) {
-throw new IllegalArgumentException("[" + name + "] can't be 
converted to milliseconds. ", e);
+throw new IllegalArgumentException(messagePrefix + "It can't be 
converted to milliseconds.", e);
 }
 }
+
+/**
+ * Generates the prefix message for validateMillisecondXX() utility
+ * @param value Object to be converted to milliseconds
+ * @param name Object name
+ * @return Error message 

[jira] [Updated] (KAFKA-7703) KafkaConsumer.position may return a wrong offset after "seekToEnd" is called

2018-12-03 Thread Shixiong Zhu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu updated KAFKA-7703:

Affects Version/s: 1.1.0
   1.1.1
   2.0.0
   2.0.1

> KafkaConsumer.position may return a wrong offset after "seekToEnd" is called
> 
>
> Key: KAFKA-7703
> URL: https://issues.apache.org/jira/browse/KAFKA-7703
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0
>Reporter: Shixiong Zhu
>Priority: Major
>
> After "seekToEnd" is called, "KafkaConsumer.position" may return a wrong 
> offset set by another reset request.
> Here is a reproducer: 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246
> In this reproducer, "poll(0)" will send an "earliest" request in background. 
> However, after "seekToEnd" is called, due to a race condition in 
> "Fetcher.resetOffsetIfNeeded" (It's not atomic, "seekToEnd" could happen 
> between the check 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R585
>  and the seek 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R605),
>  "KafkaConsumer.position" may return an "earliest" offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7703) KafkaConsumer.position may return a wrong offset after "seekToEnd" is called

2018-12-03 Thread Shixiong Zhu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708272#comment-16708272
 ] 

Shixiong Zhu commented on KAFKA-7703:
-

This seems introduced by KAFKA-6397 which moved the offset updating codes into 
a different thread.

> KafkaConsumer.position may return a wrong offset after "seekToEnd" is called
> 
>
> Key: KAFKA-7703
> URL: https://issues.apache.org/jira/browse/KAFKA-7703
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Shixiong Zhu
>Priority: Major
>
> After "seekToEnd" is called, "KafkaConsumer.position" may return a wrong 
> offset set by another reset request.
> Here is a reproducer: 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246
> In this reproducer, "poll(0)" will send an "earliest" request in background. 
> However, after "seekToEnd" is called, due to a race condition in 
> "Fetcher.resetOffsetIfNeeded" (It's not atomic, "seekToEnd" could happen 
> between the check 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R585
>  and the seek 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R605),
>  "KafkaConsumer.position" may return an "earliest" offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7696) kafka-delegation-tokens.sh using a config file that contains security.protocol=SASL_PLAINTEXT throws OutOfMemoryError if it tries to connect to an SSL-enabled secured b

2018-12-03 Thread Manikumar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708257#comment-16708257
 ] 

Manikumar commented on KAFKA-7696:
--

[~asasvari]  This is related to KAFKA-4493. There is a WIP PR 
https://github.com/apache/kafka/pull/5940.

> kafka-delegation-tokens.sh using a config file that contains 
> security.protocol=SASL_PLAINTEXT throws OutOfMemoryError if it tries to 
> connect to an SSL-enabled secured broker
> -
>
> Key: KAFKA-7696
> URL: https://issues.apache.org/jira/browse/KAFKA-7696
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 1.1.0, 2.0.0
>Reporter: Attila Sasvari
>Assignee: Viktor Somogyi
>Priority: Major
>
> When the command-config file of kafka-delegation-tokens contain 
> security.protocol=SASL_PLAINTEXT instead of SASL_SSL (i.e. due to a user 
> error), the process throws a java.lang.OutOfMemoryError upon connection 
> attempt to a secured (i.e. Kerberized, SSL-enabled) Kafka broker.
> {code}
> [2018-12-03 11:27:13,221] ERROR Uncaught exception in thread 
> 'kafka-admin-client-thread | adminclient-1': 
> (org.apache.kafka.common.utils.KafkaThread)
> java.lang.OutOfMemoryError: Java heap space
>   at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
>   at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>   at 
> org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
>   at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
>   at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:407)
>   at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:497)
>   at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:207)
>   at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:173)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:533)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:468)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1125)
>   at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7704) kafka.server.ReplicaFetechManager.MaxLag.Replica metric is reported incorrectly

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708142#comment-16708142
 ] 

ASF GitHub Bot commented on KAFKA-7704:
---

huxihx opened a new pull request #5998: KAFKA-7704: MaxLag.Replica metric is 
reported incorrectly
URL: https://github.com/apache/kafka/pull/5998
 
 
   On the follower side, for the empty `LogAppendInfo` retrieved from the 
leader, fetcherLagStats set the wrong lag for fetcherLagStats due to 
`nextOffset` is zero in this case where it actually means no lagging, so the 
lag should be set to 0 if `nextOffset` is 0 or `logAppendInfo.lastOffset` is -1.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> kafka.server.ReplicaFetechManager.MaxLag.Replica metric is reported 
> incorrectly
> ---
>
> Key: KAFKA-7704
> URL: https://issues.apache.org/jira/browse/KAFKA-7704
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 2.1.0
>Reporter: Yu Yang
>Priority: Major
> Attachments: Screen Shot 2018-12-03 at 4.33.35 PM.png
>
>
> We recently deployed kafka 2.1, and noticed a jump in 
> kafka.server.ReplicaFetcherManager.MaxLag.Replica metric. At the same time, 
> there is no under-replicated partitions for the cluster. 
> The initial analysis shows that kafka 2.1.0 does not report metric correctly 
> for topics that have no incoming traffic right now, but had traffic earlier. 
> For those topics, ReplicaFetcherManager will consider the maxLag be the 
> latest offset. 
> For instance, we have a topic named `test_topic`: 
> {code}
> [root@kafkabroker03002:/mnt/kafka/test_topic-0]# ls -l
> total 8
> -rw-rw-r-- 1 kafka kafka 10485760 Dec  4 00:13 099043947579.index
> -rw-rw-r-- 1 kafka kafka0 Sep 23 03:01 099043947579.log
> -rw-rw-r-- 1 kafka kafka   10 Dec  4 00:13 099043947579.snapshot
> -rw-rw-r-- 1 kafka kafka 10485756 Dec  4 00:13 099043947579.timeindex
> -rw-rw-r-- 1 kafka kafka4 Dec  4 00:13 leader-epoch-checkpoint
> {code}
> kafka reports ReplicaFetcherManager.MaxLag.Replica be 99043947579
>  !Screen Shot 2018-12-03 at 4.33.35 PM.png|width=720px! 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7704) kafka.server.ReplicaFetechManager.MaxLag.Replica metric is reported incorrectly

2018-12-03 Thread huxihx (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx reassigned KAFKA-7704:
-

Assignee: huxihx

> kafka.server.ReplicaFetechManager.MaxLag.Replica metric is reported 
> incorrectly
> ---
>
> Key: KAFKA-7704
> URL: https://issues.apache.org/jira/browse/KAFKA-7704
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 2.1.0
>Reporter: Yu Yang
>Assignee: huxihx
>Priority: Major
> Attachments: Screen Shot 2018-12-03 at 4.33.35 PM.png
>
>
> We recently deployed kafka 2.1, and noticed a jump in 
> kafka.server.ReplicaFetcherManager.MaxLag.Replica metric. At the same time, 
> there is no under-replicated partitions for the cluster. 
> The initial analysis shows that kafka 2.1.0 does not report metric correctly 
> for topics that have no incoming traffic right now, but had traffic earlier. 
> For those topics, ReplicaFetcherManager will consider the maxLag be the 
> latest offset. 
> For instance, we have a topic named `test_topic`: 
> {code}
> [root@kafkabroker03002:/mnt/kafka/test_topic-0]# ls -l
> total 8
> -rw-rw-r-- 1 kafka kafka 10485760 Dec  4 00:13 099043947579.index
> -rw-rw-r-- 1 kafka kafka0 Sep 23 03:01 099043947579.log
> -rw-rw-r-- 1 kafka kafka   10 Dec  4 00:13 099043947579.snapshot
> -rw-rw-r-- 1 kafka kafka 10485756 Dec  4 00:13 099043947579.timeindex
> -rw-rw-r-- 1 kafka kafka4 Dec  4 00:13 leader-epoch-checkpoint
> {code}
> kafka reports ReplicaFetcherManager.MaxLag.Replica be 99043947579
>  !Screen Shot 2018-12-03 at 4.33.35 PM.png|width=720px! 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7705) Update javadoc for the values of delivery.timeout.ms or linger.ms

2018-12-03 Thread huxihx (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx updated KAFKA-7705:
--
Summary: Update javadoc for the values of delivery.timeout.ms or linger.ms  
(was: Update javadoc for default value of delivery.timeout.ms)

> Update javadoc for the values of delivery.timeout.ms or linger.ms
> -
>
> Key: KAFKA-7705
> URL: https://issues.apache.org/jira/browse/KAFKA-7705
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 2.1.0
>Reporter: huxihx
>Priority: Minor
>  Labels: newbie
>
> In 
> [https://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html,]
> the sample producer code fails to run due to the ConfigException thrown: 
> delivery.timeout.ms should be equal to or larger than linger.ms + 
> request.timeout.ms
> The given value for delivery.timeout.ms or linger.ms on that page should be 
> updated accordingly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7705) Update javadoc for default value of delivery.timeout.ms

2018-12-03 Thread huxihx (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx updated KAFKA-7705:
--
Description: 
In 
[https://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html,]

the sample producer code fails to run due to the ConfigException thrown: 
delivery.timeout.ms should be equal to or larger than linger.ms + 
request.timeout.ms

The given value for delivery.timeout.ms or linger.ms in that page should be 
updated accordingly.

  was:
In 
[https://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html,]

the sample producer code fails to run due to the ConfigException thrown: 
delivery.timeout.ms should be equal to or larger than linger.ms + 
request.timeout.ms

The default value for delivery.timeout.ms or linger.ms should be updated 
accordingly.


> Update javadoc for default value of delivery.timeout.ms
> ---
>
> Key: KAFKA-7705
> URL: https://issues.apache.org/jira/browse/KAFKA-7705
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 2.1.0
>Reporter: huxihx
>Priority: Minor
>  Labels: newbie
>
> In 
> [https://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html,]
> the sample producer code fails to run due to the ConfigException thrown: 
> delivery.timeout.ms should be equal to or larger than linger.ms + 
> request.timeout.ms
> The given value for delivery.timeout.ms or linger.ms in that page should be 
> updated accordingly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7705) Update javadoc for default value of delivery.timeout.ms

2018-12-03 Thread huxihx (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx updated KAFKA-7705:
--
Description: 
In 
[https://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html,]

the sample producer code fails to run due to the ConfigException thrown: 
delivery.timeout.ms should be equal to or larger than linger.ms + 
request.timeout.ms

The given value for delivery.timeout.ms or linger.ms on that page should be 
updated accordingly.

  was:
In 
[https://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html,]

the sample producer code fails to run due to the ConfigException thrown: 
delivery.timeout.ms should be equal to or larger than linger.ms + 
request.timeout.ms

The given value for delivery.timeout.ms or linger.ms in that page should be 
updated accordingly.


> Update javadoc for default value of delivery.timeout.ms
> ---
>
> Key: KAFKA-7705
> URL: https://issues.apache.org/jira/browse/KAFKA-7705
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 2.1.0
>Reporter: huxihx
>Priority: Minor
>  Labels: newbie
>
> In 
> [https://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html,]
> the sample producer code fails to run due to the ConfigException thrown: 
> delivery.timeout.ms should be equal to or larger than linger.ms + 
> request.timeout.ms
> The given value for delivery.timeout.ms or linger.ms on that page should be 
> updated accordingly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7705) Update javadoc for default value of delivery.timeout.ms

2018-12-03 Thread huxihx (JIRA)
huxihx created KAFKA-7705:
-

 Summary: Update javadoc for default value of delivery.timeout.ms
 Key: KAFKA-7705
 URL: https://issues.apache.org/jira/browse/KAFKA-7705
 Project: Kafka
  Issue Type: Bug
  Components: documentation
Affects Versions: 2.1.0
Reporter: huxihx


In 
[https://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html,]

the sample producer code fails to run due to the ConfigException thrown: 
delivery.timeout.ms should be equal to or larger than linger.ms + 
request.timeout.ms

The default value for delivery.timeout.ms or linger.ms should be updated 
accordingly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7704) kafka.server.ReplicaFetechManager.MaxLag.Replica metric is reported incorrectly

2018-12-03 Thread Yu Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Yang updated KAFKA-7704:
---
Description: 
We recently deployed kafka 2.1, and noticed a jump in 
kafka.server.ReplicaFetcherManager.MaxLag.Replica metric. At the same time, 
there is no under-replicated partitions. 

The initial analysis showed that kafka 2.1.0 does not report metric correctly 
for topics that have no incoming traffic right now, but had traffic earlier. 
For those topics, ReplicaFetcherManager will consider the maxLag be the latest 
offset. 

For instance, we have a topic named `test_topic`: 

{code}
[root@kafkabroker03002:/mnt/kafka/test_topic-0]# ls -l
total 8
-rw-rw-r-- 1 kafka kafka 10485760 Dec  4 00:13 099043947579.index
-rw-rw-r-- 1 kafka kafka0 Sep 23 03:01 099043947579.log
-rw-rw-r-- 1 kafka kafka   10 Dec  4 00:13 099043947579.snapshot
-rw-rw-r-- 1 kafka kafka 10485756 Dec  4 00:13 099043947579.timeindex
-rw-rw-r-- 1 kafka kafka4 Dec  4 00:13 leader-epoch-checkpoint
{code}

kafka reports ReplicaFetcherManager.MaxLag.Replica be 99043947579

 !Screen Shot 2018-12-03 at 4.33.35 PM.png|width=720px! 



  was:
We recently deployed kafka 2.1, and noticed a jump in 
kafka.server.ReplicaFetcherManager.MaxLag.Replica metric. At the same time, 
there is no under-replicated partitions. 

The initial analysis showed that kafka 2.1.0 does not report metric correctly 
for topics that have no incoming traffic right now, but had traffic earlier. 
For those topics, ReplicaFetcherManager will consider the maxLag be the latest 
offset. 

For instance, we have a topic *test_topic*: 

{code}
[root@kafkabroker03002:/mnt/kafka/test_topic-0]# ls -l
total 8
-rw-rw-r-- 1 kafka kafka 10485760 Dec  4 00:13 099043947579.index
-rw-rw-r-- 1 kafka kafka0 Sep 23 03:01 099043947579.log
-rw-rw-r-- 1 kafka kafka   10 Dec  4 00:13 099043947579.snapshot
-rw-rw-r-- 1 kafka kafka 10485756 Dec  4 00:13 099043947579.timeindex
-rw-rw-r-- 1 kafka kafka4 Dec  4 00:13 leader-epoch-checkpoint
{code}

kafka reports ReplicaFetcherManager.MaxLag.Replica be 99043947579

 !Screen Shot 2018-12-03 at 4.33.35 PM.png|width=720px! 




> kafka.server.ReplicaFetechManager.MaxLag.Replica metric is reported 
> incorrectly
> ---
>
> Key: KAFKA-7704
> URL: https://issues.apache.org/jira/browse/KAFKA-7704
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 2.1.0
>Reporter: Yu Yang
>Priority: Major
> Attachments: Screen Shot 2018-12-03 at 4.33.35 PM.png
>
>
> We recently deployed kafka 2.1, and noticed a jump in 
> kafka.server.ReplicaFetcherManager.MaxLag.Replica metric. At the same time, 
> there is no under-replicated partitions. 
> The initial analysis showed that kafka 2.1.0 does not report metric correctly 
> for topics that have no incoming traffic right now, but had traffic earlier. 
> For those topics, ReplicaFetcherManager will consider the maxLag be the 
> latest offset. 
> For instance, we have a topic named `test_topic`: 
> {code}
> [root@kafkabroker03002:/mnt/kafka/test_topic-0]# ls -l
> total 8
> -rw-rw-r-- 1 kafka kafka 10485760 Dec  4 00:13 099043947579.index
> -rw-rw-r-- 1 kafka kafka0 Sep 23 03:01 099043947579.log
> -rw-rw-r-- 1 kafka kafka   10 Dec  4 00:13 099043947579.snapshot
> -rw-rw-r-- 1 kafka kafka 10485756 Dec  4 00:13 099043947579.timeindex
> -rw-rw-r-- 1 kafka kafka4 Dec  4 00:13 leader-epoch-checkpoint
> {code}
> kafka reports ReplicaFetcherManager.MaxLag.Replica be 99043947579
>  !Screen Shot 2018-12-03 at 4.33.35 PM.png|width=720px! 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7697) Possible deadlock in kafka.cluster.Partition

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708017#comment-16708017
 ] 

ASF GitHub Bot commented on KAFKA-7697:
---

rajinisivaram closed pull request #5997: KAFKA-7697: Avoid blocking for 
leaderIsrUpdateLock in DelayedFetch
URL: https://github.com/apache/kafka/pull/5997
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 745c89a393b..a5655c77e2d 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -836,6 +836,20 @@ class Partition(val topicPartition: TopicPartition,
 localReplica.offsetSnapshot
   }
 
+  def maybeFetchOffsetSnapshot(currentLeaderEpoch: Optional[Integer],
+  fetchOnlyFromLeader: Boolean): 
Option[LogOffsetSnapshot] = {
+if (leaderIsrUpdateLock.readLock().tryLock()) {
+  try {
+// decide whether to only fetch from leader
+val localReplica = 
localReplicaWithEpochOrException(currentLeaderEpoch, fetchOnlyFromLeader)
+Some(localReplica.offsetSnapshot)
+  } finally {
+leaderIsrUpdateLock.readLock().unlock()
+  }
+} else
+  None
+  }
+
   def fetchOffsetSnapshotOrError(currentLeaderEpoch: Optional[Integer],
  fetchOnlyFromLeader: Boolean): 
Either[LogOffsetSnapshot, Errors] = {
 inReadLock(leaderIsrUpdateLock) {
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala 
b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 90200991759..d6504e64de9 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -84,34 +84,35 @@ class DelayedFetch(delayMs: Long,
   if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
 val partition = 
replicaManager.getPartitionOrException(topicPartition,
   expectLeader = fetchMetadata.fetchOnlyLeader)
-val offsetSnapshot = 
partition.fetchOffsetSnapshot(fetchLeaderEpoch, fetchMetadata.fetchOnlyLeader)
+partition.maybeFetchOffsetSnapshot(fetchLeaderEpoch, 
fetchMetadata.fetchOnlyLeader).foreach { offsetSnapshot =>
 
-val endOffset = fetchMetadata.fetchIsolation match {
-  case FetchLogEnd => offsetSnapshot.logEndOffset
-  case FetchHighWatermark => offsetSnapshot.highWatermark
-  case FetchTxnCommitted => offsetSnapshot.lastStableOffset
-}
+  val endOffset = fetchMetadata.fetchIsolation match {
+case FetchLogEnd => offsetSnapshot.logEndOffset
+case FetchHighWatermark => offsetSnapshot.highWatermark
+case FetchTxnCommitted => offsetSnapshot.lastStableOffset
+  }
 
-// Go directly to the check for Case D if the message offsets are 
the same. If the log segment
-// has just rolled, then the high watermark offset will remain the 
same but be on the old segment,
-// which would incorrectly be seen as an instance of Case C.
-if (endOffset.messageOffset != fetchOffset.messageOffset) {
-  if (endOffset.onOlderSegment(fetchOffset)) {
-// Case C, this can happen when the new fetch operation is on 
a truncated leader
-debug(s"Satisfying fetch $fetchMetadata since it is fetching 
later segments of partition $topicPartition.")
-return forceComplete()
-  } else if (fetchOffset.onOlderSegment(endOffset)) {
-// Case C, this can happen when the fetch operation is falling 
behind the current segment
-// or the partition has just rolled a new segment
-debug(s"Satisfying fetch $fetchMetadata immediately since it 
is fetching older segments.")
-// We will not force complete the fetch request if a replica 
should be throttled.
-if (!replicaManager.shouldLeaderThrottle(quota, 
topicPartition, fetchMetadata.replicaId))
+  // Go directly to the check for Case D if the message offsets 
are the same. If the log segment
+  // has just rolled, then the high watermark offset will remain 
the same but be on the old segment,
+  // which would incorrectly be seen as an instance of Case C.
+  if (endOffset.messageOffset != fetchOffset.messageOffset) {
+if (endOffset.onOlderSegment(fetchOffset)) {
+  // Case C, this can happen when the new fetch operation is 
on a truncated leader
+  

[jira] [Commented] (KAFKA-7591) Changelog retention period doesn't synchronise with window-store size

2018-12-03 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7591?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708021#comment-16708021
 ] 

John Roesler commented on KAFKA-7591:
-

I looked into the code, and for reference, the reason the config doesn't get 
propagated is that in 
`org.apache.kafka.streams.processor.internals.InternalTopicManager#makeReady`, 
we first fetch the number of partitions for each internal topic in the 
topology. If the topic exists and has the right number of partitions, then we 
do nothing.

There's no idempotent create operation, so we'd either have to idempotently 
"alter configs" on every topic or fetch the configs and only alter the ones 
that don't match the configs that come from the topology.

I don't think it would be practical, without changing the code structure, to do 
this only for window stores, and it might be nice to have changes from other 
internal topic configs propagated.

> Changelog retention period doesn't synchronise with window-store size
> -
>
> Key: KAFKA-7591
> URL: https://issues.apache.org/jira/browse/KAFKA-7591
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jon Bates
>Priority: Major
>
> When a new windowed state store is created, the associated changelog topic's 
> `retention.ms` value is set to `window-size + 
> CHANGELOG_ADDITIONAL_RETENTION_MS`
> h3. Expected Behaviour
> If the window-size is updated, the changelog topic's `retention.ms` config 
> should be updated to reflect the new size
> h3. Actual Behaviour
> The changelog-topic's `retention.ms` setting is not amended, resulting in 
> possible loss of data upon application restart
>  
> n.b. Although it is easy to update changelog topic config, I logged this as 
> `major` due to the potential for data-loss for any user of Kafka-Streams who 
> may not be intimately aware of the relationship between a windowed store and 
> the changelog config



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7704) kafka.server.ReplicaFetechManager.MaxLag.Replica metric is reported incorrectly

2018-12-03 Thread Yu Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Yang updated KAFKA-7704:
---
Description: 
We recently deployed kafka 2.1, and noticed a jump in 
kafka.server.ReplicaFetcherManager.MaxLag.Replica metric. At the same time, 
there is no under-replicated partitions for the cluster. 

The initial analysis shows that kafka 2.1.0 does not report metric correctly 
for topics that have no incoming traffic right now, but had traffic earlier. 
For those topics, ReplicaFetcherManager will consider the maxLag be the latest 
offset. 

For instance, we have a topic named `test_topic`: 

{code}
[root@kafkabroker03002:/mnt/kafka/test_topic-0]# ls -l
total 8
-rw-rw-r-- 1 kafka kafka 10485760 Dec  4 00:13 099043947579.index
-rw-rw-r-- 1 kafka kafka0 Sep 23 03:01 099043947579.log
-rw-rw-r-- 1 kafka kafka   10 Dec  4 00:13 099043947579.snapshot
-rw-rw-r-- 1 kafka kafka 10485756 Dec  4 00:13 099043947579.timeindex
-rw-rw-r-- 1 kafka kafka4 Dec  4 00:13 leader-epoch-checkpoint
{code}

kafka reports ReplicaFetcherManager.MaxLag.Replica be 99043947579

 !Screen Shot 2018-12-03 at 4.33.35 PM.png|width=720px! 



  was:
We recently deployed kafka 2.1, and noticed a jump in 
kafka.server.ReplicaFetcherManager.MaxLag.Replica metric. At the same time, 
there is no under-replicated partitions for the cluster. 

The initial analysis showed that kafka 2.1.0 does not report metric correctly 
for topics that have no incoming traffic right now, but had traffic earlier. 
For those topics, ReplicaFetcherManager will consider the maxLag be the latest 
offset. 

For instance, we have a topic named `test_topic`: 

{code}
[root@kafkabroker03002:/mnt/kafka/test_topic-0]# ls -l
total 8
-rw-rw-r-- 1 kafka kafka 10485760 Dec  4 00:13 099043947579.index
-rw-rw-r-- 1 kafka kafka0 Sep 23 03:01 099043947579.log
-rw-rw-r-- 1 kafka kafka   10 Dec  4 00:13 099043947579.snapshot
-rw-rw-r-- 1 kafka kafka 10485756 Dec  4 00:13 099043947579.timeindex
-rw-rw-r-- 1 kafka kafka4 Dec  4 00:13 leader-epoch-checkpoint
{code}

kafka reports ReplicaFetcherManager.MaxLag.Replica be 99043947579

 !Screen Shot 2018-12-03 at 4.33.35 PM.png|width=720px! 




> kafka.server.ReplicaFetechManager.MaxLag.Replica metric is reported 
> incorrectly
> ---
>
> Key: KAFKA-7704
> URL: https://issues.apache.org/jira/browse/KAFKA-7704
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 2.1.0
>Reporter: Yu Yang
>Priority: Major
> Attachments: Screen Shot 2018-12-03 at 4.33.35 PM.png
>
>
> We recently deployed kafka 2.1, and noticed a jump in 
> kafka.server.ReplicaFetcherManager.MaxLag.Replica metric. At the same time, 
> there is no under-replicated partitions for the cluster. 
> The initial analysis shows that kafka 2.1.0 does not report metric correctly 
> for topics that have no incoming traffic right now, but had traffic earlier. 
> For those topics, ReplicaFetcherManager will consider the maxLag be the 
> latest offset. 
> For instance, we have a topic named `test_topic`: 
> {code}
> [root@kafkabroker03002:/mnt/kafka/test_topic-0]# ls -l
> total 8
> -rw-rw-r-- 1 kafka kafka 10485760 Dec  4 00:13 099043947579.index
> -rw-rw-r-- 1 kafka kafka0 Sep 23 03:01 099043947579.log
> -rw-rw-r-- 1 kafka kafka   10 Dec  4 00:13 099043947579.snapshot
> -rw-rw-r-- 1 kafka kafka 10485756 Dec  4 00:13 099043947579.timeindex
> -rw-rw-r-- 1 kafka kafka4 Dec  4 00:13 leader-epoch-checkpoint
> {code}
> kafka reports ReplicaFetcherManager.MaxLag.Replica be 99043947579
>  !Screen Shot 2018-12-03 at 4.33.35 PM.png|width=720px! 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7704) kafka.server.ReplicaFetechManager.MaxLag.Replica metric is reported incorrectly

2018-12-03 Thread Yu Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Yang updated KAFKA-7704:
---
Description: 
We recently deployed kafka 2.1, and noticed a jump in 
kafka.server.ReplicaFetcherManager.MaxLag.Replica metric. At the same time, 
there is no under-replicated partitions for the cluster. 

The initial analysis showed that kafka 2.1.0 does not report metric correctly 
for topics that have no incoming traffic right now, but had traffic earlier. 
For those topics, ReplicaFetcherManager will consider the maxLag be the latest 
offset. 

For instance, we have a topic named `test_topic`: 

{code}
[root@kafkabroker03002:/mnt/kafka/test_topic-0]# ls -l
total 8
-rw-rw-r-- 1 kafka kafka 10485760 Dec  4 00:13 099043947579.index
-rw-rw-r-- 1 kafka kafka0 Sep 23 03:01 099043947579.log
-rw-rw-r-- 1 kafka kafka   10 Dec  4 00:13 099043947579.snapshot
-rw-rw-r-- 1 kafka kafka 10485756 Dec  4 00:13 099043947579.timeindex
-rw-rw-r-- 1 kafka kafka4 Dec  4 00:13 leader-epoch-checkpoint
{code}

kafka reports ReplicaFetcherManager.MaxLag.Replica be 99043947579

 !Screen Shot 2018-12-03 at 4.33.35 PM.png|width=720px! 



  was:
We recently deployed kafka 2.1, and noticed a jump in 
kafka.server.ReplicaFetcherManager.MaxLag.Replica metric. At the same time, 
there is no under-replicated partitions. 

The initial analysis showed that kafka 2.1.0 does not report metric correctly 
for topics that have no incoming traffic right now, but had traffic earlier. 
For those topics, ReplicaFetcherManager will consider the maxLag be the latest 
offset. 

For instance, we have a topic named `test_topic`: 

{code}
[root@kafkabroker03002:/mnt/kafka/test_topic-0]# ls -l
total 8
-rw-rw-r-- 1 kafka kafka 10485760 Dec  4 00:13 099043947579.index
-rw-rw-r-- 1 kafka kafka0 Sep 23 03:01 099043947579.log
-rw-rw-r-- 1 kafka kafka   10 Dec  4 00:13 099043947579.snapshot
-rw-rw-r-- 1 kafka kafka 10485756 Dec  4 00:13 099043947579.timeindex
-rw-rw-r-- 1 kafka kafka4 Dec  4 00:13 leader-epoch-checkpoint
{code}

kafka reports ReplicaFetcherManager.MaxLag.Replica be 99043947579

 !Screen Shot 2018-12-03 at 4.33.35 PM.png|width=720px! 




> kafka.server.ReplicaFetechManager.MaxLag.Replica metric is reported 
> incorrectly
> ---
>
> Key: KAFKA-7704
> URL: https://issues.apache.org/jira/browse/KAFKA-7704
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 2.1.0
>Reporter: Yu Yang
>Priority: Major
> Attachments: Screen Shot 2018-12-03 at 4.33.35 PM.png
>
>
> We recently deployed kafka 2.1, and noticed a jump in 
> kafka.server.ReplicaFetcherManager.MaxLag.Replica metric. At the same time, 
> there is no under-replicated partitions for the cluster. 
> The initial analysis showed that kafka 2.1.0 does not report metric correctly 
> for topics that have no incoming traffic right now, but had traffic earlier. 
> For those topics, ReplicaFetcherManager will consider the maxLag be the 
> latest offset. 
> For instance, we have a topic named `test_topic`: 
> {code}
> [root@kafkabroker03002:/mnt/kafka/test_topic-0]# ls -l
> total 8
> -rw-rw-r-- 1 kafka kafka 10485760 Dec  4 00:13 099043947579.index
> -rw-rw-r-- 1 kafka kafka0 Sep 23 03:01 099043947579.log
> -rw-rw-r-- 1 kafka kafka   10 Dec  4 00:13 099043947579.snapshot
> -rw-rw-r-- 1 kafka kafka 10485756 Dec  4 00:13 099043947579.timeindex
> -rw-rw-r-- 1 kafka kafka4 Dec  4 00:13 leader-epoch-checkpoint
> {code}
> kafka reports ReplicaFetcherManager.MaxLag.Replica be 99043947579
>  !Screen Shot 2018-12-03 at 4.33.35 PM.png|width=720px! 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7704) kafka.server.ReplicaFetechManager.MaxLag.Replica metric is reported incorrectly

2018-12-03 Thread Yu Yang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yu Yang updated KAFKA-7704:
---
Description: 
We recently deployed kafka 2.1, and noticed a jump in 
kafka.server.ReplicaFetcherManager.MaxLag.Replica metric. At the same time, 
there is no under-replicated partitions. 

The initial analysis showed that kafka 2.1.0 does not report metric correctly 
for topics that have no incoming traffic right now, but had traffic earlier. 
For those topics, ReplicaFetcherManager will consider the maxLag be the latest 
offset. 

For instance, we have a topic *test_topic*: 

{code}
[root@kafkabroker03002:/mnt/kafka/test_topic-0]# ls -l
total 8
-rw-rw-r-- 1 kafka kafka 10485760 Dec  4 00:13 099043947579.index
-rw-rw-r-- 1 kafka kafka0 Sep 23 03:01 099043947579.log
-rw-rw-r-- 1 kafka kafka   10 Dec  4 00:13 099043947579.snapshot
-rw-rw-r-- 1 kafka kafka 10485756 Dec  4 00:13 099043947579.timeindex
-rw-rw-r-- 1 kafka kafka4 Dec  4 00:13 leader-epoch-checkpoint
{code}

kafka reports ReplicaFetcherManager.MaxLag.Replica be 99043947579

 !Screen Shot 2018-12-03 at 4.33.35 PM.png|width=720px! 



  was:
We deployed kafka 2.1, and noticed a jump in 
kafka.server.ReplicaFetcherManager.MaxLag.Replica metric. At the same time, 
there is no under-replicated partitions. 

The initial analysis showed that kafka 2.1.0 does not report metric correctly 
for topics that have no incoming traffic right now, but had traffic earlier. 
For those topics, ReplicaFetcherManager will consider the maxLag be the latest 
offset. 

For instance, we have a topic *test_topic*: 

{code}
[root@kafkabroker03002:/mnt/kafka/test_topic-0]# ls -l
total 8
-rw-rw-r-- 1 kafka kafka 10485760 Dec  4 00:13 099043947579.index
-rw-rw-r-- 1 kafka kafka0 Sep 23 03:01 099043947579.log
-rw-rw-r-- 1 kafka kafka   10 Dec  4 00:13 099043947579.snapshot
-rw-rw-r-- 1 kafka kafka 10485756 Dec  4 00:13 099043947579.timeindex
-rw-rw-r-- 1 kafka kafka4 Dec  4 00:13 leader-epoch-checkpoint
{code}

kafka reports ReplicaFetcherManager.MaxLag.Replica be 99043947579

 !Screen Shot 2018-12-03 at 4.33.35 PM.png|width=720px! 




> kafka.server.ReplicaFetechManager.MaxLag.Replica metric is reported 
> incorrectly
> ---
>
> Key: KAFKA-7704
> URL: https://issues.apache.org/jira/browse/KAFKA-7704
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 2.1.0
>Reporter: Yu Yang
>Priority: Major
> Attachments: Screen Shot 2018-12-03 at 4.33.35 PM.png
>
>
> We recently deployed kafka 2.1, and noticed a jump in 
> kafka.server.ReplicaFetcherManager.MaxLag.Replica metric. At the same time, 
> there is no under-replicated partitions. 
> The initial analysis showed that kafka 2.1.0 does not report metric correctly 
> for topics that have no incoming traffic right now, but had traffic earlier. 
> For those topics, ReplicaFetcherManager will consider the maxLag be the 
> latest offset. 
> For instance, we have a topic *test_topic*: 
> {code}
> [root@kafkabroker03002:/mnt/kafka/test_topic-0]# ls -l
> total 8
> -rw-rw-r-- 1 kafka kafka 10485760 Dec  4 00:13 099043947579.index
> -rw-rw-r-- 1 kafka kafka0 Sep 23 03:01 099043947579.log
> -rw-rw-r-- 1 kafka kafka   10 Dec  4 00:13 099043947579.snapshot
> -rw-rw-r-- 1 kafka kafka 10485756 Dec  4 00:13 099043947579.timeindex
> -rw-rw-r-- 1 kafka kafka4 Dec  4 00:13 leader-epoch-checkpoint
> {code}
> kafka reports ReplicaFetcherManager.MaxLag.Replica be 99043947579
>  !Screen Shot 2018-12-03 at 4.33.35 PM.png|width=720px! 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7704) kafka.server.ReplicaFetechManager.MaxLag.Replica metric is reported incorrectly

2018-12-03 Thread Yu Yang (JIRA)
Yu Yang created KAFKA-7704:
--

 Summary: kafka.server.ReplicaFetechManager.MaxLag.Replica metric 
is reported incorrectly
 Key: KAFKA-7704
 URL: https://issues.apache.org/jira/browse/KAFKA-7704
 Project: Kafka
  Issue Type: Bug
  Components: metrics
Affects Versions: 2.1.0
Reporter: Yu Yang
 Attachments: Screen Shot 2018-12-03 at 4.33.35 PM.png

We deployed kafka 2.1, and noticed a jump in 
kafka.server.ReplicaFetcherManager.MaxLag.Replica metric. At the same time, 
there is no under-replicated partitions. 

The initial analysis showed that kafka 2.1.0 does not report metric correctly 
for topics that have no incoming traffic right now, but had traffic earlier. 
For those topics, ReplicaFetcherManager will consider the maxLag be the latest 
offset. 

For instance, we have a topic *test_topic*: 

{code}
[root@kafkabroker03002:/mnt/kafka/test_topic-0]# ls -l
total 8
-rw-rw-r-- 1 kafka kafka 10485760 Dec  4 00:13 099043947579.index
-rw-rw-r-- 1 kafka kafka0 Sep 23 03:01 099043947579.log
-rw-rw-r-- 1 kafka kafka   10 Dec  4 00:13 099043947579.snapshot
-rw-rw-r-- 1 kafka kafka 10485756 Dec  4 00:13 099043947579.timeindex
-rw-rw-r-- 1 kafka kafka4 Dec  4 00:13 leader-epoch-checkpoint
{code}

kafka reports ReplicaFetcherManager.MaxLag.Replica be 99043947579

 !Screen Shot 2018-12-03 at 4.33.35 PM.png|width=720px! 





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7697) Possible deadlock in kafka.cluster.Partition

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708006#comment-16708006
 ] 

ASF GitHub Bot commented on KAFKA-7697:
---

rajinisivaram opened a new pull request #5997: KAFKA-7697: Avoid blocking for 
leaderIsrUpdateLock in DelayedFetch
URL: https://github.com/apache/kafka/pull/5997
 
 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Possible deadlock in kafka.cluster.Partition
> 
>
> Key: KAFKA-7697
> URL: https://issues.apache.org/jira/browse/KAFKA-7697
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
>Reporter: Gian Merlino
>Assignee: Rajini Sivaram
>Priority: Blocker
> Fix For: 2.2.0, 2.1.1
>
> Attachments: threaddump.txt
>
>
> After upgrading a fairly busy broker from 0.10.2.0 to 2.1.0, it locked up 
> within a few minutes (by "locked up" I mean that all request handler threads 
> were busy, and other brokers reported that they couldn't communicate with 
> it). I restarted it a few times and it did the same thing each time. After 
> downgrading to 0.10.2.0, the broker was stable. I attached a thread dump from 
> the last attempt on 2.1.0 that shows lots of kafka-request-handler- threads 
> trying to acquire the leaderIsrUpdateLock lock in kafka.cluster.Partition.
> It jumps out that there are two threads that already have some read lock 
> (can't tell which one) and are trying to acquire a second one (on two 
> different read locks: 0x000708184b88 and 0x00070821f188): 
> kafka-request-handler-1 and kafka-request-handler-4. Both are handling a 
> produce request, and in the process of doing so, are calling 
> Partition.fetchOffsetSnapshot while trying to complete a DelayedFetch. At the 
> same time, both of those locks have writers from other threads waiting on 
> them (kafka-request-handler-2 and kafka-scheduler-6). Neither of those locks 
> appear to have writers that hold them (if only because no threads in the dump 
> are deep enough in inWriteLock to indicate that).
> ReentrantReadWriteLock in nonfair mode prioritizes waiting writers over 
> readers. Is it possible that kafka-request-handler-1 and 
> kafka-request-handler-4 are each trying to read-lock the partition that is 
> currently locked by the other one, and they're both parked waiting for 
> kafka-request-handler-2 and kafka-scheduler-6 to get write locks, which they 
> never will, because the former two threads own read locks and aren't giving 
> them up?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7695) Cannot override StreamsPartitionAssignor in configuration

2018-12-03 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16708004#comment-16708004
 ] 

John Roesler commented on KAFKA-7695:
-

Perhaps, if it's indeed the case that the partition assignment strategy is not 
overridable in Streams, we could instead throw an exception early when we build 
StreamsConfig. At least, then, we could explain that this particular consumer 
config is not settable for Streams, and [~zirx] wouldn't have had to discover 
it after tracking down a mysterious stacktrace. Arguably, if the documentation 
and API both suggest you can do something, but you get an exception when you do 
it, it *is* a bug.

[~zirx], it sounds like the feature you want to request is some way to cut down 
on intermediate `(left, null)` join results. Is that right?

> Cannot override StreamsPartitionAssignor in configuration 
> --
>
> Key: KAFKA-7695
> URL: https://issues.apache.org/jira/browse/KAFKA-7695
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Dmitry Buykin
>Priority: Major
>  Labels: configuration
>
> Cannot override StreamsPartitionAssignor by changing property 
> partition.assignment.strategy in KStreams 2.0.1 because the streams are 
> crashing inside KafkaClientSupplier.getGlobalConsumer. This GlobalConsumer 
> works only with RangeAssignor which configured by default.
> Could be reproduced by setting up
> `props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
> StreamsPartitionAssignor.class.getName());`
> For me it looks like a bug.
> Opened a discussion here 
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1543395977453700
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7703) KafkaConsumer.position may return a wrong offset after "seekToEnd" is called

2018-12-03 Thread Shixiong Zhu (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shixiong Zhu updated KAFKA-7703:

Description: 
After "seekToEnd" is called, "KafkaConsumer.position" may return a wrong offset 
set by another reset request.

Here is a reproducer: 
https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246

In this reproducer, "poll(0)" will send an "earliest" request in background. 
However, after "seekToEnd" is called, due to a race condition in 
"Fetcher.resetOffsetIfNeeded" (It's not atomic, "seekToEnd" could happen 
between the check 
https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R585
 and the seek 
https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R605),
 "KafkaConsumer.position" may return an "earliest" offset.


  was:
After "seekToEnd" is called, "KafkaConsumer.position" may return a wrong offset 
set by another reset request.

Here is a reproducer: 
https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246

In this reproducer, "poll(0)" will send an "earliest" request in background. 
However, after "seekToEnd" is called, due to a race condition in 
"Fetcher.resetOffsetIfNeeded" (It's not atomic, "seekToEnd" could happen 
between 
https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R585
 and 
https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R605),
 "KafkaConsumer.position" may return an "earliest" offset.



> KafkaConsumer.position may return a wrong offset after "seekToEnd" is called
> 
>
> Key: KAFKA-7703
> URL: https://issues.apache.org/jira/browse/KAFKA-7703
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.1.0
>Reporter: Shixiong Zhu
>Priority: Major
>
> After "seekToEnd" is called, "KafkaConsumer.position" may return a wrong 
> offset set by another reset request.
> Here is a reproducer: 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246
> In this reproducer, "poll(0)" will send an "earliest" request in background. 
> However, after "seekToEnd" is called, due to a race condition in 
> "Fetcher.resetOffsetIfNeeded" (It's not atomic, "seekToEnd" could happen 
> between the check 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R585
>  and the seek 
> https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R605),
>  "KafkaConsumer.position" may return an "earliest" offset.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7703) KafkaConsumer.position may return a wrong offset after "seekToEnd" is called

2018-12-03 Thread Shixiong Zhu (JIRA)
Shixiong Zhu created KAFKA-7703:
---

 Summary: KafkaConsumer.position may return a wrong offset after 
"seekToEnd" is called
 Key: KAFKA-7703
 URL: https://issues.apache.org/jira/browse/KAFKA-7703
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.1.0
Reporter: Shixiong Zhu


After "seekToEnd" is called, "KafkaConsumer.position" may return a wrong offset 
set by another reset request.

Here is a reproducer: 
https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246

In this reproducer, "poll(0)" will send an "earliest" request in background. 
However, after "seekToEnd" is called, due to a race condition in 
"Fetcher.resetOffsetIfNeeded" (It's not atomic, "seekToEnd" could happen 
between 
https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R585
 and 
https://github.com/zsxwing/kafka/commit/4e1aa11bfa99a38ac1e2cb0872c055db56b33246#diff-b45245913eaae46aa847d2615d62cde0R605),
 "KafkaConsumer.position" may return an "earliest" offset.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7697) Possible deadlock in kafka.cluster.Partition

2018-12-03 Thread Rajini Sivaram (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707975#comment-16707975
 ] 

Rajini Sivaram commented on KAFKA-7697:
---

Changes made under KAFKA-7395 now protect fetch using the Partition's 
{{leaderIsrUpdateLock}}. This results in the read lock being acquired while 
completing a delayed fetch. This is unsafe since delayed operations can be 
completed while holding onto another Partition lock. For example the thread 
dump for request-handler-4 shows:

{quote}
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x00070821f188> (a 
java.util.concurrent.locks.ReentrantReadWriteLock$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireShared(AbstractQueuedSynchronizer.java:967)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireShared(AbstractQueuedSynchronizer.java:1283)
at 
java.util.concurrent.locks.ReentrantReadWriteLock$ReadLock.lock(ReentrantReadWriteLock.java:727)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:249)
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
at kafka.cluster.Partition.fetchOffsetSnapshot(Partition.scala:832)
at 
kafka.server.DelayedFetch.$anonfun$tryComplete$1(DelayedFetch.scala:87)
at 
kafka.server.DelayedFetch.$anonfun$tryComplete$1$adapted(DelayedFetch.scala:79)
at kafka.server.DelayedFetch$$Lambda$912/582152661.apply(Unknown Source)
at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:58)
at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:51)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at kafka.server.DelayedFetch.tryComplete(DelayedFetch.scala:79)
at 
kafka.server.DelayedOperation.maybeTryComplete(DelayedOperation.scala:121)
at 
kafka.server.DelayedOperationPurgatory$Watchers.tryCompleteWatched(DelayedOperation.scala:371)
at 
kafka.server.DelayedOperationPurgatory.checkAndComplete(DelayedOperation.scala:277)
at 
kafka.server.ReplicaManager.tryCompleteDelayedFetch(ReplicaManager.scala:307)
at 
kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:743)
at kafka.cluster.Partition$$Lambda$917/80048373.apply(Unknown Source)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:729)
at 
kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(ReplicaManager.scala:735)
at kafka.server.ReplicaManager$$Lambda$915/220982367.apply(Unknown 
Source)
at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
at scala.collection.TraversableLike$$Lambda$12/1209669119.apply(Unknown 
Source)
at 
scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:145)
at scala.collection.mutable.HashMap$$Lambda$24/477289012.apply(Unknown 
Source)
at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:235)
at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:228)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:145)
at scala.collection.TraversableLike.map(TraversableLike.scala:233)
at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at 
kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:723)
at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:470)
at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:482)
at kafka.server.KafkaApis.handle(KafkaApis.scala:106)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
at java.lang.Thread.run(Thread.java:748)
{quote}

A whole bunch of threads including all request handler threads seem to be 
deadlocked as a result of {{leaderIsrUpdateLock}} of two partitions that are 
blocked while completing delayed fetch as a result of waiting writers.

For purgatory operations that acquire a lock, we use that lock as the delayed 
operation lock, but that is not an option here since fetch could contain 
multiple partitions. So we need some other way to avoid blocking for a 
Partition lock while holding onto another Partition lock.

> Possible deadlock in kafka.cluster.Partition
> 
>
> Key: KAFKA-7697
> URL: 

[jira] [Commented] (KAFKA-5146) Kafka Streams: remove compile dependency on connect-json

2018-12-03 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707972#comment-16707972
 ] 

Matthias J. Sax commented on KAFKA-5146:


[~guozhang] [~ijuma] [~miguno] How important do we think is the example? Could 
we just remove it? If we create a new module, do we need a KIP (maybe not?).

> Kafka Streams: remove compile dependency on connect-json
> 
>
> Key: KAFKA-5146
> URL: https://issues.apache.org/jira/browse/KAFKA-5146
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.2.0, 0.10.2.1
>Reporter: Michael Noll
>Priority: Minor
>
> We currently have a compile-dependency on `connect-json`:
> {code}
> 
>   org.apache.kafka
>   connect-json
>   0.10.2.0
>   compile
>   
> {code}
> The snippet above is from the generated POM of Kafka Streams as of 0.10.2.0 
> release.
> AFAICT the only reason for that is because the Kafka Streams *examples* 
> showcase some JSON processing, but that’s it.
> First and foremost, we should remove the connect-json dependency, and also 
> figure out a way to set up / structure the examples so we that we can 
> continue showcasing JSON support.  Alternatively, we could consider removing 
> the JSON example (but I don't like that, personally).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7695) Cannot override StreamsPartitionAssignor in configuration

2018-12-03 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707931#comment-16707931
 ] 

Matthias J. Sax commented on KAFKA-7695:


It's by design but not a bug. I would close this as "not a bug" or update as a 
"feature request" – however, the `StreamPartitionAssignor` is a very central 
component and might result in incorrect processing if not implemented 
correctly. Thus, I am very hesitant to support a request like this and to allow 
overwriting it.

Also, it's unclear to me (as mentioned in the Slack discussion), who partition 
assignment could help with throttling? Especially, because reading data from 
topics is based on record timestamps to provide event-time semantics.

Last, for KStream-GlobalKTable joins, there is no timestamp synchronization by 
design. If you need timestamp synchronization you need to use KStream-KTable 
join instead. Both joins are semantically differently.

> Cannot override StreamsPartitionAssignor in configuration 
> --
>
> Key: KAFKA-7695
> URL: https://issues.apache.org/jira/browse/KAFKA-7695
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Dmitry Buykin
>Priority: Major
>  Labels: configuration
>
> Cannot override StreamsPartitionAssignor by changing property 
> partition.assignment.strategy in KStreams 2.0.1 because the streams are 
> crashing inside KafkaClientSupplier.getGlobalConsumer. This GlobalConsumer 
> works only with RangeAssignor which configured by default.
> Could be reproduced by setting up
> `props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
> StreamsPartitionAssignor.class.getName());`
> For me it looks like a bug.
> Opened a discussion here 
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1543395977453700
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7702) Prefixed ACLs don't work with single character prefix

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707932#comment-16707932
 ] 

ASF GitHub Bot commented on KAFKA-7702:
---

rajinisivaram opened a new pull request #5994: KAFKA-7702: Fix matching of 
prefixed ACLs to match single char prefix
URL: https://github.com/apache/kafka/pull/5994
 
 
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Prefixed ACLs don't work with single character prefix
> -
>
> Key: KAFKA-7702
> URL: https://issues.apache.org/jira/browse/KAFKA-7702
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 2.0.1, 2.1.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.2.0, 2.1.1, 2.0.2
>
>
> Prefixed ACLs with a single character are not matched correctly against 
> resource names. ALLOW rule with single character prefix doesn't grant access 
> to any resource and DENY rule with single character prefix doesn't deny 
> access to any resource since the prefix is not matched correctly.
> This is not an exploitable security vulnerability since only authenticated 
> users with authorization to create ACLs can create the prefixed ACLs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7702) Prefixed ACLs don't work with single character prefix

2018-12-03 Thread Rajini Sivaram (JIRA)
Rajini Sivaram created KAFKA-7702:
-

 Summary: Prefixed ACLs don't work with single character prefix
 Key: KAFKA-7702
 URL: https://issues.apache.org/jira/browse/KAFKA-7702
 Project: Kafka
  Issue Type: Bug
  Components: security
Affects Versions: 2.1.0, 2.0.1
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 2.2.0, 2.1.1, 2.0.2


Prefixed ACLs with a single character are not matched correctly against 
resource names. ALLOW rule with single character prefix doesn't grant access to 
any resource and DENY rule with single character prefix doesn't deny access to 
any resource since the prefix is not matched correctly.

This is not an exploitable security vulnerability since only authenticated 
users with authorization to create ACLs can create the prefixed ACLs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7678) Failed to close producer due to java.lang.NullPointerException

2018-12-03 Thread Jonathan Santilli (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707879#comment-16707879
 ] 

Jonathan Santilli commented on KAFKA-7678:
--

PR submitted [~mjsax]

[https://github.com/apache/kafka/pull/5993]

 

Cheers!

--

Jonathan

> Failed to close producer due to java.lang.NullPointerException
> --
>
> Key: KAFKA-7678
> URL: https://issues.apache.org/jira/browse/KAFKA-7678
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Jonathan Santilli
>Assignee: Jonathan Santilli
>Priority: Minor
>  Labels: bug
>
> This occurs when the group is rebalancing in a Kafka Stream application and 
> the process (the Kafka Stream application) receives a *SIGTERM* to stop it 
> gracefully.
>  
>  
> {noformat}
> ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] 
> Failed to close producer due to the following error:
> java.lang.NullPointerException
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat}
>  
>  
> Although I have checked the code and the method 
> `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` 
> class is expecting any kind of error to happen since is catching 
> `*Throwable*`.
>  
>  
>  
> {noformat}
> try {
>  recordCollector.close();
> } catch (final Throwable e) {
>  log.error("Failed to close producer due to the following error:", e);
> } finally {
>  producer = null;
> }{noformat}
>  
> Should we consider this a bug?
> In my opinion, we could check for the `*null*` possibility at 
> `*RecordCollectorImpl*.*java*` class:
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  producer.close();
>  producer = null;
>  checkForException();
> }{noformat}
>  
> Change it for:
>  
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  if ( Objects.nonNull(producer) ) {
> producer.close();
> producer = null;
>  }
>  checkForException();
> }{noformat}
>  
> How does that sound?
>  
> Kafka Brokers running 2.0.0
> Kafka Stream and client 2.1.0
> OpenJDK 8
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7701) Revert kafka-trunk-jdk8 CI change to re-enable testAll-based builds

2018-12-03 Thread Colin Hicks (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin Hicks updated KAFKA-7701:
---
Description: 
As of Apache Jenkins build #3235, the kafka-trunk-jdk8 job was configured to 
build and test independently against Scala 2.12, followed by building and 
testing against Scala 2.11. Previously, the configuration leveraged the testAll 
Gradle task.

Build #3235 completed successfully. The previous series of kafka-trunk-jdk8 
failures correspond to the introduction of Gradle 5.0 in 
[https://github.com/apache/kafka/commit/4a0fd4c41b3255a6df932eb22bd4f45d38717641.]
 The consistent failure symptoms have been instances of 
java.lang.NoClassDefFoundError in Kafka Streams tests.

After addressing the issues assumed to be caused by the Gradle change, the CI 
configuration should be reverted to its previous state.

  was:
As of Apache Jenkins build #3235, the kafka-trunk-jdk8 job was configured to 
build and test independently against Scala 2.12, followed by building and 
testing against Scala 2.11. Previously, the configuration leveraged the testAll 
Gradle task.

Build #3235 completed successfully. The previous series of failures failures of 
the kafka-trunk-jdk8 job correspond to the introduction of Gradle 5.0 in 
[https://github.com/apache/kafka/commit/4a0fd4c41b3255a6df932eb22bd4f45d38717641.]
 The consistent failure symptoms have been instances of 
java.lang.NoClassDefFoundError in Kafka Streams tests.

After addressing the issues assumed to be caused by the Gradle change, the CI 
configuration should be reverted to its previous state.


> Revert kafka-trunk-jdk8 CI change to re-enable testAll-based builds
> ---
>
> Key: KAFKA-7701
> URL: https://issues.apache.org/jira/browse/KAFKA-7701
> Project: Kafka
>  Issue Type: Task
>Reporter: Colin Hicks
>Priority: Minor
>
> As of Apache Jenkins build #3235, the kafka-trunk-jdk8 job was configured to 
> build and test independently against Scala 2.12, followed by building and 
> testing against Scala 2.11. Previously, the configuration leveraged the 
> testAll Gradle task.
> Build #3235 completed successfully. The previous series of kafka-trunk-jdk8 
> failures correspond to the introduction of Gradle 5.0 in 
> [https://github.com/apache/kafka/commit/4a0fd4c41b3255a6df932eb22bd4f45d38717641.]
>  The consistent failure symptoms have been instances of 
> java.lang.NoClassDefFoundError in Kafka Streams tests.
> After addressing the issues assumed to be caused by the Gradle change, the CI 
> configuration should be reverted to its previous state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7701) Revert kafka-trunk-jdk8 CI change to re-enable testAll-based builds

2018-12-03 Thread Colin Hicks (JIRA)
Colin Hicks created KAFKA-7701:
--

 Summary: Revert kafka-trunk-jdk8 CI change to re-enable 
testAll-based builds
 Key: KAFKA-7701
 URL: https://issues.apache.org/jira/browse/KAFKA-7701
 Project: Kafka
  Issue Type: Task
Reporter: Colin Hicks


As of Apache Jenkins build #3235, the kafka-trunk-jdk8 job was configured to 
build and test independently against Scala 2.12, followed by building and 
testing against Scala 2.11. Previously, the configuration leveraged the testAll 
Gradle task.

Build #3235 completed successfully. The previous series of failures failures of 
the kafka-trunk-jdk8 job correspond to the introduction of Gradle 5.0 in 
[https://github.com/apache/kafka/commit/4a0fd4c41b3255a6df932eb22bd4f45d38717641.]
 The consistent failure symptoms have been instances of 
java.lang.NoClassDefFoundError in Kafka Streams tests.

After addressing the issues assumed to be caused by the Gradle change, the CI 
configuration should be reverted to its previous state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7700) AbstractConfig does not honor Properties defaults

2018-12-03 Thread Tommy Becker (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tommy Becker reassigned KAFKA-7700:
---

Assignee: (was: Tommy Becker)

> AbstractConfig does not honor Properties defaults
> -
>
> Key: KAFKA-7700
> URL: https://issues.apache.org/jira/browse/KAFKA-7700
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 2.1.0
>Reporter: Tommy Becker
>Priority: Minor
>
> Kafka clients such as the Consumer and Producer require various configuration 
> parameters to work. In the case of the Consumer and Producer, these 
> parameters are provided by passing either a {{Map}} or 
> {{Properties}} instance to the respective constructor.
> {{Properties}} is a legacy class (akin to {{Vector)}} that adds no value 
> above {{Map}} other than the ability to wrap another 
> {{Properties}} instance that provides defaults. But Kafka negates this 
> benefit by treating the {{Properties}} instance as a {{Map}}, which only 
> works due to an unfortunate decision to have {{Properties}} extend 
> {{Hashtable}}.  Such treatment bypasses the defaults because they are only 
> consulted by {{Properties.getProperty()}}. The net result is that when 
> creating Kafka clients via {{Properties}}, any configuration from its 
> defaults is ignored.
> This has been reported several times over the years as KAFKA-1909, 
> KAFKA-2184, KAFKA-3049, and KAFKA-5514. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7698) Kafka Broker fail to start: ProducerFencedException thrown from producerstatemanager.scala!checkProducerEpoch

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707628#comment-16707628
 ] 

ASF GitHub Bot commented on KAFKA-7698:
---

mingaliu opened a new pull request #5992: KAFKA-7698: Kafka Broker fail to 
start: ProducerFencedException throw…
URL: https://github.com/apache/kafka/pull/5992
 
 
   If ValidationType is None, also skip the check in appendEndTxnMarker 
(similar to append).
   
   Verified with existing unitest and our daily operation.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Broker fail to start: ProducerFencedException thrown from 
> producerstatemanager.scala!checkProducerEpoch 
> --
>
> Key: KAFKA-7698
> URL: https://issues.apache.org/jira/browse/KAFKA-7698
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ming Liu
>Priority: Major
>
> During our operation of Kafka, we frequently saw this failure: 
>    There was an error in one of the threads during logs loading: 
> org.apache.kafka.common.errors.ProducerFencedException:
> {code:java}
> [06:57:09,697] INFO [ProducerStateManager partition=interaction_events-127] 
> Loading producer state from snapshot file 
> '/data/disk5/kafka/interaction_events-127/092130764817.snapshot' 
> (kafka.log.ProducerStateManager)
> [06:57:09,698] INFO [Log partition=interaction_events-127, 
> dir=/data/disk5/kafka] Completed load of log with 11 segments, log start 
> offset 91975003024 and log end offset 92130764817 in 12701 ms (kafka.log.Log)
> [06:57:09,701] ERROR There was an error in one of the threads during logs 
> loading: org.apache.kafka.common.errors.ProducerFencedException: Producer's 
> epoch is no longer valid. There is probably another producer with a newer 
> epoch. 63 (request epoch), 66 (server epoch) (kafka.log.LogManager)
> [06:57:09,705] INFO [ProducerStateManager 
> partition=client_interaction_events_authorid_enrichment-20] Writing producer 
> snapshot at offset 92418754384 (kafka.log.ProducerStateManager)
> [06:57:09,707] ERROR [KafkaServer id=2] Fatal error during KafkaServer 
> startup. Prepare to shutdown (kafka.server.KafkaServer)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 63 
> (request epoch), 66 (server epoch)
> {code:java}
>  {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7698) Kafka Broker fail to start: ProducerFencedException thrown from producerstatemanager.scala!checkProducerEpoch

2018-12-03 Thread Ming Liu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707621#comment-16707621
 ] 

Ming Liu commented on KAFKA-7698:
-

The problem is seems coming from appendEndTxnMarker of 
ProducerStateManager.scala.

ProducerAppendInfo() has this ValidationType passed in. During initial 
bootstrapping, ValidationType is set to None (instead of Client or All).

So, for Append(), maybeValidateAppend() is called and the check is skipped 
during loading phase.

But for appendEndTxnMarker(), it seems we should also skip the check. 

> Kafka Broker fail to start: ProducerFencedException thrown from 
> producerstatemanager.scala!checkProducerEpoch 
> --
>
> Key: KAFKA-7698
> URL: https://issues.apache.org/jira/browse/KAFKA-7698
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ming Liu
>Priority: Major
>
> During our operation of Kafka, we frequently saw this failure: 
>    There was an error in one of the threads during logs loading: 
> org.apache.kafka.common.errors.ProducerFencedException:
> {code:java}
> [06:57:09,697] INFO [ProducerStateManager partition=interaction_events-127] 
> Loading producer state from snapshot file 
> '/data/disk5/kafka/interaction_events-127/092130764817.snapshot' 
> (kafka.log.ProducerStateManager)
> [06:57:09,698] INFO [Log partition=interaction_events-127, 
> dir=/data/disk5/kafka] Completed load of log with 11 segments, log start 
> offset 91975003024 and log end offset 92130764817 in 12701 ms (kafka.log.Log)
> [06:57:09,701] ERROR There was an error in one of the threads during logs 
> loading: org.apache.kafka.common.errors.ProducerFencedException: Producer's 
> epoch is no longer valid. There is probably another producer with a newer 
> epoch. 63 (request epoch), 66 (server epoch) (kafka.log.LogManager)
> [06:57:09,705] INFO [ProducerStateManager 
> partition=client_interaction_events_authorid_enrichment-20] Writing producer 
> snapshot at offset 92418754384 (kafka.log.ProducerStateManager)
> [06:57:09,707] ERROR [KafkaServer id=2] Fatal error during KafkaServer 
> startup. Prepare to shutdown (kafka.server.KafkaServer)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 63 
> (request epoch), 66 (server epoch)
> {code:java}
>  {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7700) AbstractConfig does not honor Properties defaults

2018-12-03 Thread Tommy Becker (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7700?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707618#comment-16707618
 ] 

Tommy Becker commented on KAFKA-7700:
-

One option here would be to deprecate the use of {{Properties}} for 
configuration these classes, though such an approach would require a KIP. 
Alternatively we could probably add support for defaults in {{AbstractConfig}}.

> AbstractConfig does not honor Properties defaults
> -
>
> Key: KAFKA-7700
> URL: https://issues.apache.org/jira/browse/KAFKA-7700
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 2.1.0
>Reporter: Tommy Becker
>Assignee: Tommy Becker
>Priority: Minor
>
> Kafka clients such as the Consumer and Producer require various configuration 
> parameters to work. In the case of the Consumer and Producer, these 
> parameters are provided by passing either a {{Map}} or 
> {{Properties}} instance to the respective constructor.
> {{Properties}} is a legacy class (akin to {{Vector)}} that adds no value 
> above {{Map}} other than the ability to wrap another 
> {{Properties}} instance that provides defaults. But Kafka negates this 
> benefit by treating the {{Properties}} instance as a {{Map}}, which only 
> works due to an unfortunate decision to have {{Properties}} extend 
> {{Hashtable}}.  Such treatment bypasses the defaults because they are only 
> consulted by {{Properties.getProperty()}}. The net result is that when 
> creating Kafka clients via {{Properties}}, any configuration from its 
> defaults is ignored.
> This has been reported several times over the years as KAFKA-1909, 
> KAFKA-2184, KAFKA-3049, and KAFKA-5514. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7700) AbstractConfig does not honor Properties defaults

2018-12-03 Thread Tommy Becker (JIRA)
Tommy Becker created KAFKA-7700:
---

 Summary: AbstractConfig does not honor Properties defaults
 Key: KAFKA-7700
 URL: https://issues.apache.org/jira/browse/KAFKA-7700
 Project: Kafka
  Issue Type: Bug
  Components: config
Affects Versions: 2.1.0
Reporter: Tommy Becker
Assignee: Tommy Becker


Kafka clients such as the Consumer and Producer require various configuration 
parameters to work. In the case of the Consumer and Producer, these parameters 
are provided by passing either a {{Map}} or {{Properties}} instance 
to the respective constructor.

{{Properties}} is a legacy class (akin to {{Vector)}} that adds no value above 
{{Map}} other than the ability to wrap another {{Properties}} 
instance that provides defaults. But Kafka negates this benefit by treating the 
{{Properties}} instance as a {{Map}}, which only works due to an unfortunate 
decision to have {{Properties}} extend {{Hashtable}}.  Such treatment bypasses 
the defaults because they are only consulted by {{Properties.getProperty()}}. 
The net result is that when creating Kafka clients via {{Properties}}, any 
configuration from its defaults is ignored.

This has been reported several times over the years as KAFKA-1909, KAFKA-2184, 
KAFKA-3049, and KAFKA-5514. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7698) Kafka Broker fail to start: ProducerFencedException thrown from producerstatemanager.scala!checkProducerEpoch

2018-12-03 Thread Ming Liu (JIRA)
Ming Liu created KAFKA-7698:
---

 Summary: Kafka Broker fail to start: ProducerFencedException 
thrown from producerstatemanager.scala!checkProducerEpoch 
 Key: KAFKA-7698
 URL: https://issues.apache.org/jira/browse/KAFKA-7698
 Project: Kafka
  Issue Type: Bug
Reporter: Ming Liu


During our operation of Kafka, we frequently saw this failure: 

   There was an error in one of the threads during logs loading: 
org.apache.kafka.common.errors.ProducerFencedException:

{code:java}

[06:57:09,697] INFO [ProducerStateManager partition=interaction_events-127] 
Loading producer state from snapshot file 
'/data/disk5/kafka/interaction_events-127/092130764817.snapshot' 
(kafka.log.ProducerStateManager)
[06:57:09,698] INFO [Log partition=interaction_events-127, 
dir=/data/disk5/kafka] Completed load of log with 11 segments, log start offset 
91975003024 and log end offset 92130764817 in 12701 ms (kafka.log.Log)
[06:57:09,701] ERROR There was an error in one of the threads during logs 
loading: org.apache.kafka.common.errors.ProducerFencedException: Producer's 
epoch is no longer valid. There is probably another producer with a newer 
epoch. 63 (request epoch), 66 (server epoch) (kafka.log.LogManager)
[06:57:09,705] INFO [ProducerStateManager 
partition=client_interaction_events_authorid_enrichment-20] Writing producer 
snapshot at offset 92418754384 (kafka.log.ProducerStateManager)
[06:57:09,707] ERROR [KafkaServer id=2] Fatal error during KafkaServer startup. 
Prepare to shutdown (kafka.server.KafkaServer)
org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is no 
longer valid. There is probably another producer with a newer epoch. 63 
(request epoch), 66 (server epoch)
{code:java}
 {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7699) Improve wall-clock time punctuations

2018-12-03 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-7699:
--

 Summary: Improve wall-clock time punctuations
 Key: KAFKA-7699
 URL: https://issues.apache.org/jira/browse/KAFKA-7699
 Project: Kafka
  Issue Type: New Feature
  Components: streams
Reporter: Matthias J. Sax


Currently, wall-clock time punctuation allow to schedule periodic call backs 
based on wall-clock time progress. The punctuation time starts, when the 
punctuation is scheduled, thus, it's non-deterministic what is desired for many 
use cases (I want a call-back in 5 minutes from "now").

It would be a nice improvement, to allow users to "anchor" wall-clock 
punctation, too, similar to a cron job: Thus, a punctuation would be triggered 
at "fixed" times like the beginning of the next hour, independent when the 
punctuation was registered.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-2334) Prevent HW from going back during leader failover

2018-12-03 Thread David Arthur (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur reassigned KAFKA-2334:
---

Assignee: David Arthur

> Prevent HW from going back during leader failover 
> --
>
> Key: KAFKA-2334
> URL: https://issues.apache.org/jira/browse/KAFKA-2334
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.8.2.1
>Reporter: Guozhang Wang
>Assignee: David Arthur
>Priority: Major
>  Labels: reliability
>
> Consider the following scenario:
> 0. Kafka use replication factor of 2, with broker B1 as the leader, and B2 as 
> the follower. 
> 1. A producer keep sending to Kafka with ack=-1.
> 2. A consumer repeat issuing ListOffset request to Kafka.
> And the following sequence:
> 0. B1 current log-end-offset (LEO) 0, HW-offset 0; and same with B2.
> 1. B1 receive a ProduceRequest of 100 messages, append to local log (LEO 
> becomes 100) and hold the request in purgatory.
> 2. B1 receive a FetchRequest starting at offset 0 from follower B2, and 
> returns the 100 messages.
> 3. B2 append its received message to local log (LEO becomes 100).
> 4. B1 receive another FetchRequest starting at offset 100 from B2, knowing 
> that B2's LEO has caught up to 100, and hence update its own HW, and 
> satisfying the ProduceRequest in purgatory, and sending the FetchResponse 
> with HW 100 back to B2 ASYNCHRONOUSLY.
> 5. B1 successfully sends the ProduceResponse to the producer, and then fails, 
> hence the FetchResponse did not reach B2, whose HW remains 0.
> From the consumer's point of view, it could first see the latest offset of 
> 100 (from B1), and then see the latest offset of 0 (from B2), and then the 
> latest offset gradually catch up to 100.
> This is because we use HW to guard the ListOffset and 
> Fetch-from-ordinary-consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2334) Prevent HW from going back during leader failover

2018-12-03 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-2334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707611#comment-16707611
 ] 

ASF GitHub Bot commented on KAFKA-2334:
---

mumrah opened a new pull request #5991: KAFKA-2334 Guard against non-monotonic 
offsets in the client
URL: https://github.com/apache/kafka/pull/5991
 
 
   After a recent leader election, the leaders high-water mark might lag behind 
the offset at the beginning of the new epoch (as well as the previous leader's 
HW). This can lead to offsets going backwards from a client perspective, which 
is confusing and leads to strange behavior in some clients.
   
   This change causes Partition#fetchOffsetForTimestamp to throw an exception 
to indicate the offsets are not yet available from the leader. For new clients, 
a new OFFSET_NOT_AVAILABLE error is added. For existing clients, a 
LEADER_NOT_AVAILABLE is thrown.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Prevent HW from going back during leader failover 
> --
>
> Key: KAFKA-2334
> URL: https://issues.apache.org/jira/browse/KAFKA-2334
> Project: Kafka
>  Issue Type: Bug
>  Components: replication
>Affects Versions: 0.8.2.1
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: reliability
>
> Consider the following scenario:
> 0. Kafka use replication factor of 2, with broker B1 as the leader, and B2 as 
> the follower. 
> 1. A producer keep sending to Kafka with ack=-1.
> 2. A consumer repeat issuing ListOffset request to Kafka.
> And the following sequence:
> 0. B1 current log-end-offset (LEO) 0, HW-offset 0; and same with B2.
> 1. B1 receive a ProduceRequest of 100 messages, append to local log (LEO 
> becomes 100) and hold the request in purgatory.
> 2. B1 receive a FetchRequest starting at offset 0 from follower B2, and 
> returns the 100 messages.
> 3. B2 append its received message to local log (LEO becomes 100).
> 4. B1 receive another FetchRequest starting at offset 100 from B2, knowing 
> that B2's LEO has caught up to 100, and hence update its own HW, and 
> satisfying the ProduceRequest in purgatory, and sending the FetchResponse 
> with HW 100 back to B2 ASYNCHRONOUSLY.
> 5. B1 successfully sends the ProduceResponse to the producer, and then fails, 
> hence the FetchResponse did not reach B2, whose HW remains 0.
> From the consumer's point of view, it could first see the latest offset of 
> 100 (from B1), and then see the latest offset of 0 (from B2), and then the 
> latest offset gradually catch up to 100.
> This is because we use HW to guard the ListOffset and 
> Fetch-from-ordinary-consumer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7697) Possible deadlock in kafka.cluster.Partition

2018-12-03 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707609#comment-16707609
 ] 

Ismael Juma commented on KAFKA-7697:


Marking as blocker until we understand the details.

> Possible deadlock in kafka.cluster.Partition
> 
>
> Key: KAFKA-7697
> URL: https://issues.apache.org/jira/browse/KAFKA-7697
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
>Reporter: Gian Merlino
>Assignee: Rajini Sivaram
>Priority: Blocker
> Fix For: 2.2.0, 2.1.1
>
> Attachments: threaddump.txt
>
>
> After upgrading a fairly busy broker from 0.10.2.0 to 2.1.0, it locked up 
> within a few minutes (by "locked up" I mean that all request handler threads 
> were busy, and other brokers reported that they couldn't communicate with 
> it). I restarted it a few times and it did the same thing each time. After 
> downgrading to 0.10.2.0, the broker was stable. I attached a thread dump from 
> the last attempt on 2.1.0 that shows lots of kafka-request-handler- threads 
> trying to acquire the leaderIsrUpdateLock lock in kafka.cluster.Partition.
> It jumps out that there are two threads that already have some read lock 
> (can't tell which one) and are trying to acquire a second one (on two 
> different read locks: 0x000708184b88 and 0x00070821f188): 
> kafka-request-handler-1 and kafka-request-handler-4. Both are handling a 
> produce request, and in the process of doing so, are calling 
> Partition.fetchOffsetSnapshot while trying to complete a DelayedFetch. At the 
> same time, both of those locks have writers from other threads waiting on 
> them (kafka-request-handler-2 and kafka-scheduler-6). Neither of those locks 
> appear to have writers that hold them (if only because no threads in the dump 
> are deep enough in inWriteLock to indicate that).
> ReentrantReadWriteLock in nonfair mode prioritizes waiting writers over 
> readers. Is it possible that kafka-request-handler-1 and 
> kafka-request-handler-4 are each trying to read-lock the partition that is 
> currently locked by the other one, and they're both parked waiting for 
> kafka-request-handler-2 and kafka-scheduler-6 to get write locks, which they 
> never will, because the former two threads own read locks and aren't giving 
> them up?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7697) Possible deadlock in kafka.cluster.Partition

2018-12-03 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-7697:
---
Fix Version/s: 2.1.1
   2.2.0

> Possible deadlock in kafka.cluster.Partition
> 
>
> Key: KAFKA-7697
> URL: https://issues.apache.org/jira/browse/KAFKA-7697
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
>Reporter: Gian Merlino
>Assignee: Rajini Sivaram
>Priority: Blocker
> Fix For: 2.2.0, 2.1.1
>
> Attachments: threaddump.txt
>
>
> After upgrading a fairly busy broker from 0.10.2.0 to 2.1.0, it locked up 
> within a few minutes (by "locked up" I mean that all request handler threads 
> were busy, and other brokers reported that they couldn't communicate with 
> it). I restarted it a few times and it did the same thing each time. After 
> downgrading to 0.10.2.0, the broker was stable. I attached a thread dump from 
> the last attempt on 2.1.0 that shows lots of kafka-request-handler- threads 
> trying to acquire the leaderIsrUpdateLock lock in kafka.cluster.Partition.
> It jumps out that there are two threads that already have some read lock 
> (can't tell which one) and are trying to acquire a second one (on two 
> different read locks: 0x000708184b88 and 0x00070821f188): 
> kafka-request-handler-1 and kafka-request-handler-4. Both are handling a 
> produce request, and in the process of doing so, are calling 
> Partition.fetchOffsetSnapshot while trying to complete a DelayedFetch. At the 
> same time, both of those locks have writers from other threads waiting on 
> them (kafka-request-handler-2 and kafka-scheduler-6). Neither of those locks 
> appear to have writers that hold them (if only because no threads in the dump 
> are deep enough in inWriteLock to indicate that).
> ReentrantReadWriteLock in nonfair mode prioritizes waiting writers over 
> readers. Is it possible that kafka-request-handler-1 and 
> kafka-request-handler-4 are each trying to read-lock the partition that is 
> currently locked by the other one, and they're both parked waiting for 
> kafka-request-handler-2 and kafka-scheduler-6 to get write locks, which they 
> never will, because the former two threads own read locks and aren't giving 
> them up?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7697) Possible deadlock in kafka.cluster.Partition

2018-12-03 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-7697:
---
Priority: Blocker  (was: Major)

> Possible deadlock in kafka.cluster.Partition
> 
>
> Key: KAFKA-7697
> URL: https://issues.apache.org/jira/browse/KAFKA-7697
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
>Reporter: Gian Merlino
>Assignee: Rajini Sivaram
>Priority: Blocker
> Fix For: 2.2.0, 2.1.1
>
> Attachments: threaddump.txt
>
>
> After upgrading a fairly busy broker from 0.10.2.0 to 2.1.0, it locked up 
> within a few minutes (by "locked up" I mean that all request handler threads 
> were busy, and other brokers reported that they couldn't communicate with 
> it). I restarted it a few times and it did the same thing each time. After 
> downgrading to 0.10.2.0, the broker was stable. I attached a thread dump from 
> the last attempt on 2.1.0 that shows lots of kafka-request-handler- threads 
> trying to acquire the leaderIsrUpdateLock lock in kafka.cluster.Partition.
> It jumps out that there are two threads that already have some read lock 
> (can't tell which one) and are trying to acquire a second one (on two 
> different read locks: 0x000708184b88 and 0x00070821f188): 
> kafka-request-handler-1 and kafka-request-handler-4. Both are handling a 
> produce request, and in the process of doing so, are calling 
> Partition.fetchOffsetSnapshot while trying to complete a DelayedFetch. At the 
> same time, both of those locks have writers from other threads waiting on 
> them (kafka-request-handler-2 and kafka-scheduler-6). Neither of those locks 
> appear to have writers that hold them (if only because no threads in the dump 
> are deep enough in inWriteLock to indicate that).
> ReentrantReadWriteLock in nonfair mode prioritizes waiting writers over 
> readers. Is it possible that kafka-request-handler-1 and 
> kafka-request-handler-4 are each trying to read-lock the partition that is 
> currently locked by the other one, and they're both parked waiting for 
> kafka-request-handler-2 and kafka-scheduler-6 to get write locks, which they 
> never will, because the former two threads own read locks and aren't giving 
> them up?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7697) Possible deadlock in kafka.cluster.Partition

2018-12-03 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7697?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram reassigned KAFKA-7697:
-

Assignee: Rajini Sivaram

> Possible deadlock in kafka.cluster.Partition
> 
>
> Key: KAFKA-7697
> URL: https://issues.apache.org/jira/browse/KAFKA-7697
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
>Reporter: Gian Merlino
>Assignee: Rajini Sivaram
>Priority: Major
> Attachments: threaddump.txt
>
>
> After upgrading a fairly busy broker from 0.10.2.0 to 2.1.0, it locked up 
> within a few minutes (by "locked up" I mean that all request handler threads 
> were busy, and other brokers reported that they couldn't communicate with 
> it). I restarted it a few times and it did the same thing each time. After 
> downgrading to 0.10.2.0, the broker was stable. I attached a thread dump from 
> the last attempt on 2.1.0 that shows lots of kafka-request-handler- threads 
> trying to acquire the leaderIsrUpdateLock lock in kafka.cluster.Partition.
> It jumps out that there are two threads that already have some read lock 
> (can't tell which one) and are trying to acquire a second one (on two 
> different read locks: 0x000708184b88 and 0x00070821f188): 
> kafka-request-handler-1 and kafka-request-handler-4. Both are handling a 
> produce request, and in the process of doing so, are calling 
> Partition.fetchOffsetSnapshot while trying to complete a DelayedFetch. At the 
> same time, both of those locks have writers from other threads waiting on 
> them (kafka-request-handler-2 and kafka-scheduler-6). Neither of those locks 
> appear to have writers that hold them (if only because no threads in the dump 
> are deep enough in inWriteLock to indicate that).
> ReentrantReadWriteLock in nonfair mode prioritizes waiting writers over 
> readers. Is it possible that kafka-request-handler-1 and 
> kafka-request-handler-4 are each trying to read-lock the partition that is 
> currently locked by the other one, and they're both parked waiting for 
> kafka-request-handler-2 and kafka-scheduler-6 to get write locks, which they 
> never will, because the former two threads own read locks and aren't giving 
> them up?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7657) Invalid reporting of stream state in Kafka streams application

2018-12-03 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707585#comment-16707585
 ] 

Guozhang Wang commented on KAFKA-7657:
--

[~pkleindl] could you provide the logs around the time when the stream 
application has started to report REBALANCING so we can investigate?

> Invalid reporting of stream state in Kafka streams application
> --
>
> Key: KAFKA-7657
> URL: https://issues.apache.org/jira/browse/KAFKA-7657
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: Thomas Crowley
>Priority: Major
>  Labels: bug
>
> We have a streams application with 3 instances running, two of which are 
> reporting the state of REBALANCING even after they have been running for 
> days. Restarting the application has no effect on the stream state.
> This seems suspect because each instance appears to be processing messages, 
> and the kafka-consumer-groups CLI tool reports hardly any offset lag in any 
> of the partitions assigned to the REBALANCING consumers. Each partition seems 
> to be processing an equal amount of records too.
> Inspecting the state.dir on disk, it looks like the RocksDB state has been 
> built and hovers at the expected size on disk.
> This problem has persisted for us after we rebuilt our Kafka cluster and 
> reset topics + consumer groups in our dev environment.
> There is nothing in the logs (with level set to DEBUG) in both the broker or 
> the application that suggests something exceptional has happened causing the 
> application to be stuck REBALANCING.
> We are also running multiple streaming applications where this problem does 
> not exist.
> Two differences between this application and our other streaming applications 
> are:
>  * We have processing.guarantee set to exactly_once
>  * We are using a ValueTransformer which fetches from and puts data on a 
> windowed state store
> The REBALANCING state is returned from both polling the state method of our 
> KafkaStreams instance, and our custom metric which is derived from some logic 
> in a KafkaStreams.StateListener class attached via the setStateListener 
> method.
>  
> While I have provided a bit of context, before I reply with some reproducible 
> code - is there a simple way in which I can determine that my streams 
> application is in a RUNNING state without relying on the same mechanisms as 
> used above?
> Further, given that it seems like my application is actually running - could 
> this perhaps be a bug to do with how the stream state is being reported (in 
> the context of a transactional stream using the processor API)?
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7695) Cannot override StreamsPartitionAssignor in configuration

2018-12-03 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16707579#comment-16707579
 ] 

Guozhang Wang commented on KAFKA-7695:
--

Hello [~zirx] just to clarify on your issue, you want to set the global 
consumer's partition assignor to KafkaStream's StreamsPartitionAssignor class, 
is that right? Or is this {{StreamsPartitionAssignor}} your own customized 
partition assignor? I'm asking this because the KafkaStreams' 
StreamsPartitionAssignor does not do bootstrap loading side topics by 
throttling other topics, but the name {{StreamsPartitionAssignor}} seems to be 
indicating that you are not using a customized partition assignor.

Back to your original question, there are two aspect to the issue:

1) global consumer inside KafkaStreams does not rely on partition assignor 
which is part of the rebalance protocol to assign partitions at all, instead 
they rely on {{Consumer.assign}} (instead of Consumer.subscribe) to assign 
topic-partitions directly so overriding the PartitionAssignor would not help 
fixing your issue.

2) since you mentioned global consumer which is used for building global store 
/ GlobalKTable only, I assume your left joins are all KStream-GlobalKTable 
joins, note that in Kafka Streams we always try to bootstrap global stores / 
GlobalKTables first before proceeding to the stream task execution; so I'm not 
sure why you would still hit this issue of "join window during initial loading".

Could you elaborate a bit more on the second point above so that I can better 
understand your scenario?

> Cannot override StreamsPartitionAssignor in configuration 
> --
>
> Key: KAFKA-7695
> URL: https://issues.apache.org/jira/browse/KAFKA-7695
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Dmitry Buykin
>Priority: Major
>  Labels: configuration
>
> Cannot override StreamsPartitionAssignor by changing property 
> partition.assignment.strategy in KStreams 2.0.1 because the streams are 
> crashing inside KafkaClientSupplier.getGlobalConsumer. This GlobalConsumer 
> works only with RangeAssignor which configured by default.
> Could be reproduced by setting up
> `props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
> StreamsPartitionAssignor.class.getName());`
> For me it looks like a bug.
> Opened a discussion here 
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1543395977453700
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7697) Possible deadlock in kafka.cluster.Partition

2018-12-03 Thread Gian Merlino (JIRA)
Gian Merlino created KAFKA-7697:
---

 Summary: Possible deadlock in kafka.cluster.Partition
 Key: KAFKA-7697
 URL: https://issues.apache.org/jira/browse/KAFKA-7697
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.1.0
Reporter: Gian Merlino
 Attachments: threaddump.txt

After upgrading a fairly busy broker from 0.10.2.0 to 2.1.0, it locked up 
within a few minutes (by "locked up" I mean that all request handler threads 
were busy, and other brokers reported that they couldn't communicate with it). 
I restarted it a few times and it did the same thing each time. After 
downgrading to 0.10.2.0, the broker was stable. I attached a thread dump from 
the last attempt on 2.1.0 that shows lots of kafka-request-handler- threads 
trying to acquire the leaderIsrUpdateLock lock in kafka.cluster.Partition.

It jumps out that there are two threads that already have some read lock (can't 
tell which one) and are trying to acquire a second one (on two different read 
locks: 0x000708184b88 and 0x00070821f188): kafka-request-handler-1 and 
kafka-request-handler-4. Both are handling a produce request, and in the 
process of doing so, are calling Partition.fetchOffsetSnapshot while trying to 
complete a DelayedFetch. At the same time, both of those locks have writers 
from other threads waiting on them (kafka-request-handler-2 and 
kafka-scheduler-6). Neither of those locks appear to have writers that hold 
them (if only because no threads in the dump are deep enough in inWriteLock to 
indicate that).

ReentrantReadWriteLock in nonfair mode prioritizes waiting writers over 
readers. Is it possible that kafka-request-handler-1 and 
kafka-request-handler-4 are each trying to read-lock the partition that is 
currently locked by the other one, and they're both parked waiting for 
kafka-request-handler-2 and kafka-scheduler-6 to get write locks, which they 
never will, because the former two threads own read locks and aren't giving 
them up?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7696) kafka-delegation-tokens.sh using a config file that contains security.protocol=SASL_PLAINTEXT throws OutOfMemoryError if it tries to connect to an SSL-enabled secured br

2018-12-03 Thread Attila Sasvari (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Attila Sasvari reassigned KAFKA-7696:
-

Assignee: Viktor Somogyi

> kafka-delegation-tokens.sh using a config file that contains 
> security.protocol=SASL_PLAINTEXT throws OutOfMemoryError if it tries to 
> connect to an SSL-enabled secured broker
> -
>
> Key: KAFKA-7696
> URL: https://issues.apache.org/jira/browse/KAFKA-7696
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 1.1.0, 2.0.0
>Reporter: Attila Sasvari
>Assignee: Viktor Somogyi
>Priority: Major
>
> When the command-config file of kafka-delegation-tokens contain 
> security.protocol=SASL_PLAINTEXT instead of SASL_SSL (i.e. due to a user 
> error), the process throws a java.lang.OutOfMemoryError upon connection 
> attempt to a secured (i.e. Kerberized, SSL-enabled) Kafka broker.
> {code}
> [2018-12-03 11:27:13,221] ERROR Uncaught exception in thread 
> 'kafka-admin-client-thread | adminclient-1': 
> (org.apache.kafka.common.utils.KafkaThread)
> java.lang.OutOfMemoryError: Java heap space
>   at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
>   at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
>   at 
> org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
>   at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
>   at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:407)
>   at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:497)
>   at 
> org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:207)
>   at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:173)
>   at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:533)
>   at org.apache.kafka.common.network.Selector.poll(Selector.java:468)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
>   at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1125)
>   at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7696) kafka-delegation-tokens.sh using a config file that contains security.protocol=SASL_PLAINTEXT throws OutOfMemoryError if it tries to connect to an SSL-enabled secured bro

2018-12-03 Thread Attila Sasvari (JIRA)
Attila Sasvari created KAFKA-7696:
-

 Summary: kafka-delegation-tokens.sh using a config file that 
contains security.protocol=SASL_PLAINTEXT throws OutOfMemoryError if it tries 
to connect to an SSL-enabled secured broker
 Key: KAFKA-7696
 URL: https://issues.apache.org/jira/browse/KAFKA-7696
 Project: Kafka
  Issue Type: Bug
  Components: tools
Affects Versions: 2.0.0, 1.1.0
Reporter: Attila Sasvari


When the command-config file of kafka-delegation-tokens contain 
security.protocol=SASL_PLAINTEXT instead of SASL_SSL (i.e. due to a user 
error), the process throws a java.lang.OutOfMemoryError upon connection attempt 
to a secured (i.e. Kerberized, SSL-enabled) Kafka broker.

{code}
[2018-12-03 11:27:13,221] ERROR Uncaught exception in thread 
'kafka-admin-client-thread | adminclient-1': 
(org.apache.kafka.common.utils.KafkaThread)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at 
org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveResponseOrToken(SaslClientAuthenticator.java:407)
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.receiveKafkaResponse(SaslClientAuthenticator.java:497)
at 
org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:207)
at 
org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:173)
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:533)
at org.apache.kafka.common.network.Selector.poll(Selector.java:468)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1125)
at java.lang.Thread.run(Thread.java:745)
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7695) Cannot override StreamsPartitionAssignor in configuration

2018-12-03 Thread Dmitry Buykin (JIRA)
Dmitry Buykin created KAFKA-7695:


 Summary: Cannot override StreamsPartitionAssignor in configuration 
 Key: KAFKA-7695
 URL: https://issues.apache.org/jira/browse/KAFKA-7695
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.0.1, 2.0.0
Reporter: Dmitry Buykin


Cannot override StreamsPartitionAssignor by changing property 
partition.assignment.strategy in KStreams 2.0.1 because the streams are 
crashing inside KafkaClientSupplier.getGlobalConsumer. This GlobalConsumer 
works only with RangeAssignor which configured by default.

Could be reproduced by setting up
`props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
StreamsPartitionAssignor.class.getName());`
For me it looks like a bug.

Opened a discussion here 

https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1543395977453700

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7695) Cannot override StreamsPartitionAssignor in configuration

2018-12-03 Thread Dmitry Buykin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7695?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706929#comment-16706929
 ] 

Dmitry Buykin commented on KAFKA-7695:
--

Why I need a customized PartitionAssignor:

I need to enable a customized PartitionAssignor to throttle throughput on some 
topics. In our application we have quite large join window during initial 
loading, so my idea was to load data from side topics of left joins faster (or 
slow down base stream) to reduce number of duplicates. It's mainly because we 
have ~7 left joins in our processing pipeline for enriching data. With billions 
events it becomes a problem as we could have up to 128 (2^7) dupes for the same 
event in the worse case when a matched event is not loaded yet to RocksDB from 
side streams.

If there are other ways to throttle processing speed on some streams, please 
let me know.

> Cannot override StreamsPartitionAssignor in configuration 
> --
>
> Key: KAFKA-7695
> URL: https://issues.apache.org/jira/browse/KAFKA-7695
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0, 2.0.1
>Reporter: Dmitry Buykin
>Priority: Major
>  Labels: configuration
>
> Cannot override StreamsPartitionAssignor by changing property 
> partition.assignment.strategy in KStreams 2.0.1 because the streams are 
> crashing inside KafkaClientSupplier.getGlobalConsumer. This GlobalConsumer 
> works only with RangeAssignor which configured by default.
> Could be reproduced by setting up
> `props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, 
> StreamsPartitionAssignor.class.getName());`
> For me it looks like a bug.
> Opened a discussion here 
> https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1543395977453700
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7657) Invalid reporting of stream state in Kafka streams application

2018-12-03 Thread Patrik Kleindl (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706877#comment-16706877
 ] 

Patrik Kleindl commented on KAFKA-7657:
---

[~guozhang] It seems we are seeing the same behaviour here in one of our 
environments.

Version is 2.0.0-cp1, EOS is not enabled.

Stream applications are running on two hosts each, on each of them I currently 
see one (but not the same) stream application reporting REBALANCING.

I have suggested to the customer to open a Confluent support ticket.

> Invalid reporting of stream state in Kafka streams application
> --
>
> Key: KAFKA-7657
> URL: https://issues.apache.org/jira/browse/KAFKA-7657
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: Thomas Crowley
>Priority: Major
>  Labels: bug
>
> We have a streams application with 3 instances running, two of which are 
> reporting the state of REBALANCING even after they have been running for 
> days. Restarting the application has no effect on the stream state.
> This seems suspect because each instance appears to be processing messages, 
> and the kafka-consumer-groups CLI tool reports hardly any offset lag in any 
> of the partitions assigned to the REBALANCING consumers. Each partition seems 
> to be processing an equal amount of records too.
> Inspecting the state.dir on disk, it looks like the RocksDB state has been 
> built and hovers at the expected size on disk.
> This problem has persisted for us after we rebuilt our Kafka cluster and 
> reset topics + consumer groups in our dev environment.
> There is nothing in the logs (with level set to DEBUG) in both the broker or 
> the application that suggests something exceptional has happened causing the 
> application to be stuck REBALANCING.
> We are also running multiple streaming applications where this problem does 
> not exist.
> Two differences between this application and our other streaming applications 
> are:
>  * We have processing.guarantee set to exactly_once
>  * We are using a ValueTransformer which fetches from and puts data on a 
> windowed state store
> The REBALANCING state is returned from both polling the state method of our 
> KafkaStreams instance, and our custom metric which is derived from some logic 
> in a KafkaStreams.StateListener class attached via the setStateListener 
> method.
>  
> While I have provided a bit of context, before I reply with some reproducible 
> code - is there a simple way in which I can determine that my streams 
> application is in a RUNNING state without relying on the same mechanisms as 
> used above?
> Further, given that it seems like my application is actually running - could 
> this perhaps be a bug to do with how the stream state is being reported (in 
> the context of a transactional stream using the processor API)?
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5146) Kafka Streams: remove compile dependency on connect-json

2018-12-03 Thread Robin Van Praet (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706826#comment-16706826
 ] 

Robin Van Praet commented on KAFKA-5146:


Yes as a workaround we excluded the dependency indeed

> Kafka Streams: remove compile dependency on connect-json
> 
>
> Key: KAFKA-5146
> URL: https://issues.apache.org/jira/browse/KAFKA-5146
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.2.0, 0.10.2.1
>Reporter: Michael Noll
>Priority: Minor
>
> We currently have a compile-dependency on `connect-json`:
> {code}
> 
>   org.apache.kafka
>   connect-json
>   0.10.2.0
>   compile
>   
> {code}
> The snippet above is from the generated POM of Kafka Streams as of 0.10.2.0 
> release.
> AFAICT the only reason for that is because the Kafka Streams *examples* 
> showcase some JSON processing, but that’s it.
> First and foremost, we should remove the connect-json dependency, and also 
> figure out a way to set up / structure the examples so we that we can 
> continue showcasing JSON support.  Alternatively, we could consider removing 
> the JSON example (but I don't like that, personally).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5146) Kafka Streams: remove compile dependency on connect-json

2018-12-03 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16706806#comment-16706806
 ] 

Matthias J. Sax commented on KAFKA-5146:


I agree that we should get rid of the dependency :)

As a workaround, you could exclude the dependency? IIRC, maven allows to 
specify a dependency as "provided" suppressing that it get's pulled in (not 
sure if you use maven)? Because Kafka Streams actually does not depend on it, 
but only the example code that is not used, it should be save to exclude it.

> Kafka Streams: remove compile dependency on connect-json
> 
>
> Key: KAFKA-5146
> URL: https://issues.apache.org/jira/browse/KAFKA-5146
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.0, 0.10.2.0, 0.10.2.1
>Reporter: Michael Noll
>Priority: Minor
>
> We currently have a compile-dependency on `connect-json`:
> {code}
> 
>   org.apache.kafka
>   connect-json
>   0.10.2.0
>   compile
>   
> {code}
> The snippet above is from the generated POM of Kafka Streams as of 0.10.2.0 
> release.
> AFAICT the only reason for that is because the Kafka Streams *examples* 
> showcase some JSON processing, but that’s it.
> First and foremost, we should remove the connect-json dependency, and also 
> figure out a way to set up / structure the examples so we that we can 
> continue showcasing JSON support.  Alternatively, we could consider removing 
> the JSON example (but I don't like that, personally).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)