[jira] [Commented] (KAFKA-9236) Confused log after using CLI scripts to produce messages

2019-11-26 Thread Xiang Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16983216#comment-16983216
 ] 

Xiang Zhang commented on KAFKA-9236:


OK, now I understand, except that if the producer does not time out, what's the 
point of informing users this value ?

> Confused log after using CLI scripts to produce messages
> 
>
> Key: KAFKA-9236
> URL: https://issues.apache.org/jira/browse/KAFKA-9236
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Xiang Zhang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9236) Confused log after using CLI scripts to produce messages

2019-11-26 Thread huxihx (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16983211#comment-16983211
 ] 

huxihx commented on KAFKA-9236:
---

[~iamabug] It is saying the producer is closing with the specified timeout 
value `Long.MAX_VALUE`, which means the close has no timeout at all.

> Confused log after using CLI scripts to produce messages
> 
>
> Key: KAFKA-9236
> URL: https://issues.apache.org/jira/browse/KAFKA-9236
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Xiang Zhang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9236) Confused log after using CLI scripts to produce messages

2019-11-26 Thread Xiang Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16983197#comment-16983197
 ] 

Xiang Zhang commented on KAFKA-9236:


Not sure how to add picture, here is a screenshot: [ 
https://tva1.sinaimg.cn/large/006y8mN6ly1g9cl2ytr86j31h5079dh3.jpg 
|https://tva1.sinaimg.cn/large/006y8mN6ly1g9cl2ytr86j31h5079dh3.jpg]

What does the last log mean ? [~huxi_2b] 


{code:java}
19/11/27 11:26:14 INFO producer.KafkaProducer: [Producer 
clientId=console-producer] Closing the Kafka producer with timeoutMillis = 
9223372036854775807 ms
{code}

> Confused log after using CLI scripts to produce messages
> 
>
> Key: KAFKA-9236
> URL: https://issues.apache.org/jira/browse/KAFKA-9236
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Xiang Zhang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9236) Confused log after using CLI scripts to produce messages

2019-11-26 Thread huxihx (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16983195#comment-16983195
 ] 

huxihx commented on KAFKA-9236:
---

Could you briefly describe the problem you ran into?

> Confused log after using CLI scripts to produce messages
> 
>
> Key: KAFKA-9236
> URL: https://issues.apache.org/jira/browse/KAFKA-9236
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Xiang Zhang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9225) kafka fail to run on linux-aarch64

2019-11-26 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16983176#comment-16983176
 ] 

Sophie Blee-Goldman commented on KAFKA-9225:


Ok that makes sense (I agree, but didn't remember if we had agreed we needed to 
wait for 3.0 to bump rocks past 6.0 or still planned to do it in 2.5)

> kafka fail to run on linux-aarch64
> --
>
> Key: KAFKA-9225
> URL: https://issues.apache.org/jira/browse/KAFKA-9225
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: jiamei xie
>Priority: Major
>  Labels: incompatible
> Attachments: compat_report.html
>
>
> *Steps to reproduce:*
> 1. Download Kafka latest source code
> 2. Build it with gradle
> 3. Run 
> [streamDemo|[https://kafka.apache.org/23/documentation/streams/quickstart]]
> when running bin/kafka-run-class.sh 
> org.apache.kafka.streams.examples.wordcount.WordCountDemo, it crashed with 
> the following error message
> {code:java}
> xjm@ubuntu-arm01:~/kafka$bin/kafka-run-class.sh 
> org.apache.kafka.streams.examples.wordcount.WordCountDemo
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/core/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/tools/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/api/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/transforms/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/file/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/mirror/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/mirror-client/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/json/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/basic-auth-extension/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> [2019-11-19 15:42:23,277] WARN The configuration 'admin.retries' was supplied 
> but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
> [2019-11-19 15:42:23,278] WARN The configuration 'admin.retry.backoff.ms' was 
> supplied but isn't a known config. 
> (org.apache.kafka.clients.consumer.ConsumerConfig)
> [2019-11-19 15:42:24,278] ERROR stream-client 
> [streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48] All stream threads 
> have died. The instance will be in error state and should be closed. 
> (org.apach e.kafka.streams.KafkaStreams)
> Exception in thread 
> "streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48-StreamThread-1" 
> java.lang.UnsatisfiedLinkError: /tmp/librocksdbjni1377754636857652484.so: 
> /tmp/librocksdbjni13777546368576524 84.so: 
> cannot open shared object file: No such file or directory (Possible cause: 
> can't load AMD 64-bit .so on a AARCH64-bit platform)
> {code}
>  *Analyze:*
> This issue is caused by rocksdbjni-5.18.3.jar which doesn't come with aarch64 
> native support. Replace rocksdbjni-5.18.3.jar with rocksdbjni-6.3.6.jar from 
> [https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni/6.3.6] can fix 
> this problem.
> Attached is the binary compatibility report of rocksdbjni.jar between 5.18.3 
> and 6.3.6. The result is 81.8%. So is it possible to upgrade rocksdbjni to 
> 6.3.6 in upstream? Should there be any kind of tests to execute, please 
> kindly point me. Thanks a lot.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9225) kafka fail to run on linux-aarch64

2019-11-26 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16983156#comment-16983156
 ] 

Matthias J. Sax commented on KAFKA-9225:


Well. As we planned to wait for AK `3.0.0` release to bump to RocksDB 6+, even 
if it's a lot, it should be ok. It'a a major release, we document (or point to 
RocksDB docs) and tell people upfront. Don't think we can do anything else? \cc 
[~bbejeck] [~vvcephei] [~guozhang]

> kafka fail to run on linux-aarch64
> --
>
> Key: KAFKA-9225
> URL: https://issues.apache.org/jira/browse/KAFKA-9225
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: jiamei xie
>Priority: Major
>  Labels: incompatible
> Attachments: compat_report.html
>
>
> *Steps to reproduce:*
> 1. Download Kafka latest source code
> 2. Build it with gradle
> 3. Run 
> [streamDemo|[https://kafka.apache.org/23/documentation/streams/quickstart]]
> when running bin/kafka-run-class.sh 
> org.apache.kafka.streams.examples.wordcount.WordCountDemo, it crashed with 
> the following error message
> {code:java}
> xjm@ubuntu-arm01:~/kafka$bin/kafka-run-class.sh 
> org.apache.kafka.streams.examples.wordcount.WordCountDemo
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/core/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/tools/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/api/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/transforms/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/file/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/mirror/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/mirror-client/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/json/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/basic-auth-extension/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> [2019-11-19 15:42:23,277] WARN The configuration 'admin.retries' was supplied 
> but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
> [2019-11-19 15:42:23,278] WARN The configuration 'admin.retry.backoff.ms' was 
> supplied but isn't a known config. 
> (org.apache.kafka.clients.consumer.ConsumerConfig)
> [2019-11-19 15:42:24,278] ERROR stream-client 
> [streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48] All stream threads 
> have died. The instance will be in error state and should be closed. 
> (org.apach e.kafka.streams.KafkaStreams)
> Exception in thread 
> "streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48-StreamThread-1" 
> java.lang.UnsatisfiedLinkError: /tmp/librocksdbjni1377754636857652484.so: 
> /tmp/librocksdbjni13777546368576524 84.so: 
> cannot open shared object file: No such file or directory (Possible cause: 
> can't load AMD 64-bit .so on a AARCH64-bit platform)
> {code}
>  *Analyze:*
> This issue is caused by rocksdbjni-5.18.3.jar which doesn't come with aarch64 
> native support. Replace rocksdbjni-5.18.3.jar with rocksdbjni-6.3.6.jar from 
> [https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni/6.3.6] can fix 
> this problem.
> Attached is the binary compatibility report of rocksdbjni.jar between 5.18.3 
> and 6.3.6. The result is 81.8%. So is it possible to upgrade rocksdbjni to 
> 6.3.6 in upstream? Should there be any kind of tests to execute, please 
> kindly point me. Thanks a lot.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9238) Flaky Test UncleanLeaderElectionTest#testUncleanLeaderElectionEnabledByTopicOverride

2019-11-26 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-9238:
--

 Summary: Flaky Test 
UncleanLeaderElectionTest#testUncleanLeaderElectionEnabledByTopicOverride
 Key: KAFKA-9238
 URL: https://issues.apache.org/jira/browse/KAFKA-9238
 Project: Kafka
  Issue Type: Bug
  Components: core, unit tests
Reporter: Matthias J. Sax


[https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/26843/testReport/junit/kafka.integration/UncleanLeaderElectionTest/testUncleanLeaderElectionEnabledByTopicOverride/]
{quote}java.lang.AssertionError: Consumed more records than expected 
expected:<1> but was:<2> at org.junit.Assert.fail(Assert.java:89) at 
org.junit.Assert.failNotEquals(Assert.java:835) at 
org.junit.Assert.assertEquals(Assert.java:647) at 
kafka.utils.TestUtils$.consumeRecords(TestUtils.scala:1344) at 
kafka.integration.UncleanLeaderElectionTest.consumeAllMessages(UncleanLeaderElectionTest.scala:285)
 at 
kafka.integration.UncleanLeaderElectionTest.verifyUncleanLeaderElectionEnabled(UncleanLeaderElectionTest.scala:182)
 at 
kafka.integration.UncleanLeaderElectionTest.testUncleanLeaderElectionEnabledByTopicOverride(UncleanLeaderElectionTest.scala:139){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9237) Flaky Test UncleanLeaderElectionTest#testUncleanLeaderElectionDisabled

2019-11-26 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-9237:
--

 Summary: Flaky Test 
UncleanLeaderElectionTest#testUncleanLeaderElectionDisabled
 Key: KAFKA-9237
 URL: https://issues.apache.org/jira/browse/KAFKA-9237
 Project: Kafka
  Issue Type: Bug
  Components: core, unit tests
Reporter: Matthias J. Sax


[https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/26843/testReport/junit/kafka.integration/UncleanLeaderElectionTest/testUncleanLeaderElectionDisabled/]
{quote}org.scalatest.exceptions.TestFailedException: Inconsistent metadata 
after first server startup at 
org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:530)
 at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) at 
org.scalatest.Assertions$class.fail(Assertions.scala:1091) at 
org.scalatest.Assertions$.fail(Assertions.scala:1389) at 
kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:842) at 
kafka.integration.UncleanLeaderElectionTest.verifyUncleanLeaderElectionDisabled(UncleanLeaderElectionTest.scala:256)
 at 
kafka.integration.UncleanLeaderElectionTest.testUncleanLeaderElectionDisabled(UncleanLeaderElectionTest.scala:124){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9236) Confused log after using CLI scripts to produce messages

2019-11-26 Thread Xiang Zhang (Jira)
Xiang Zhang created KAFKA-9236:
--

 Summary: Confused log after using CLI scripts to produce messages
 Key: KAFKA-9236
 URL: https://issues.apache.org/jira/browse/KAFKA-9236
 Project: Kafka
  Issue Type: Improvement
Reporter: Xiang Zhang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9235) Transaction state not cleaned up following StopReplica request

2019-11-26 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-9235:
--

 Summary: Transaction state not cleaned up following StopReplica 
request
 Key: KAFKA-9235
 URL: https://issues.apache.org/jira/browse/KAFKA-9235
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


When the broker receives a StopReplica request from the controller for one of 
the transaction state topics, we should make sure to cleanup existing state in 
the TransactionCoordinator for the corresponding partition. We have similar 
logic already for the group coordinator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9232) Coordinator new member heartbeat completion does not work for JoinGroup v3

2019-11-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16983116#comment-16983116
 ] 

ASF GitHub Bot commented on KAFKA-9232:
---

ableegoldman commented on pull request #7753: KAFKA-9232: Coordinator new 
member heartbeat completion does not work for JoinGroup v3
URL: https://github.com/apache/kafka/pull/7753
 
 
   The v3 JoinGroup logic does not properly complete the initial heartbeat for 
new members, which then expires after the static 5 minute timeout. The core 
issue is in the `shouldKeepAlive` method, which returns false when it should 
return true.
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Coordinator new member heartbeat completion does not work for JoinGroup v3
> --
>
> Key: KAFKA-9232
> URL: https://issues.apache.org/jira/browse/KAFKA-9232
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> For older versions of the JoinGroup API, the coordinator implements a static 
> timeout for new members of 5 minutes. This timeout is implemented using the 
> heartbeat purgatory and we expect that the delayed operation will be force 
> completed if the member successfully joins. This is implemented in 
> GroupCoordinator with the following logic:
> {code:scala}
> group.maybeInvokeJoinCallback(member, joinResult)
> completeAndScheduleNextHeartbeatExpiration(group, member)
> member.isNew = false
> {code}
> However, heartbeat completion depends on this check:
> {code:scala}
>   def shouldKeepAlive(deadlineMs: Long): Boolean = {
> if (isAwaitingJoin)
>   !isNew || latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > 
> deadlineMs
> else awaitingSyncCallback != null ||
>   latestHeartbeat + sessionTimeoutMs > deadlineMs
>   }
> {code}
> Since we invoke the join callback first, we will fall to the second branch. 
> This will only return true when the latest heartbeat plus session timeout 
> exceeds the deadline. The deadline in this case depends only on the 
> statically configured new member timeout, which means the heartbeat cannot 
> complete until about 5 minutes have passed. If the member falls out of the 
> group before then, then the heartbeat ultimately expires, which may trigger a 
> spurious rebalance.
> Newer versions of the protocol are not affected by this bug because we return 
> immediately the first time a member joins the group.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9234) Consider using @Nullable and @Nonnull annotations

2019-11-26 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-9234:
--

 Summary: Consider using @Nullable and @Nonnull annotations
 Key: KAFKA-9234
 URL: https://issues.apache.org/jira/browse/KAFKA-9234
 Project: Kafka
  Issue Type: Improvement
  Components: admin, clients, consumer, KafkaConnect, producer , 
streams, streams-test-utils
Reporter: Matthias J. Sax


Java7 was dropped some time ago, and we might want to consider usein Java8 
`@Nullable` and `@Nonnull` annotations for all public facing APIs instead of 
documenting it in JavaDocs only.

This tickets should be broken down in a series of smaller PRs to keep the scope 
of each PR contained, allowing for more effective reviews.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key

2019-11-26 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16983091#comment-16983091
 ] 

John Roesler commented on KAFKA-8769:
-

Yes, that is a problem. It’s why I mentioned the “low-traffic” problem in the 
description. In practice, something like what you’re doing is probably 
unavoidable.

It seems like this ticket captures an issue that’s particularly important for 
IoT use cases. 

I agree with Matthias. As useful as this improvement would be, it seems like 
we’re still waiting for inspiration for how it could be incorporated 
holistically into the API. Per-key tracking is certainly overkill for many use 
cases, and probably wouldn’t be worth the overhead when not needed. On top of 
that we have the low-traffic problem. What we need is a design that solves 
these problems without adding a lot of complexity to the api. 

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. This is the opposite issue, where a partition 
> doesn't get enough traffic to advance stream time and results remain "stuck" 
> in the suppression buffers. We can provide some mechanism to force the 
> advancement of time across all partitions, for use in testing when you want 
> to flush out all results, or in production when some topic is low volume. We 
> shouldn't consider tracking time _more_ granularly until this problem is 
> solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9049) TopicCommandWithAdminClientTest should use mocks

2019-11-26 Thread Dhiraj Dwarapudi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9049?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16983066#comment-16983066
 ] 

Dhiraj Dwarapudi commented on KAFKA-9049:
-

I am a new contributor and I would like to start with this ticket. Can I go 
ahead and assign this to myself?

> TopicCommandWithAdminClientTest should use mocks
> 
>
> Key: KAFKA-9049
> URL: https://issues.apache.org/jira/browse/KAFKA-9049
> Project: Kafka
>  Issue Type: Improvement
>  Components: unit tests
>Affects Versions: 2.5.0
>Reporter: Viktor Somogyi-Vass
>Priority: Minor
>  Labels: easy, newbie, newbie++
>
> The {{TopicCommandWithAdminClientTest}} class currently sets up a few brokers 
> for every test case which is wasteful and slow. We should improve it by 
> mocking out the broker behavior (maybe use {{MockAdminClient}}?). 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9225) kafka fail to run on linux-aarch64

2019-11-26 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16983064#comment-16983064
 ] 

Sophie Blee-Goldman commented on KAFKA-9225:


Thanks for generating the compatibility report! Unfortunately it looks like 
Rocks has made a _lot_ of breaking changes in the APIs exposed to Streams 
users, and a number of them are likely pretty commonly used...

[~mjsax] [~cadonna] any thoughts? 

> kafka fail to run on linux-aarch64
> --
>
> Key: KAFKA-9225
> URL: https://issues.apache.org/jira/browse/KAFKA-9225
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: jiamei xie
>Priority: Major
>  Labels: incompatible
> Attachments: compat_report.html
>
>
> *Steps to reproduce:*
> 1. Download Kafka latest source code
> 2. Build it with gradle
> 3. Run 
> [streamDemo|[https://kafka.apache.org/23/documentation/streams/quickstart]]
> when running bin/kafka-run-class.sh 
> org.apache.kafka.streams.examples.wordcount.WordCountDemo, it crashed with 
> the following error message
> {code:java}
> xjm@ubuntu-arm01:~/kafka$bin/kafka-run-class.sh 
> org.apache.kafka.streams.examples.wordcount.WordCountDemo
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/core/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/tools/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/api/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/transforms/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/file/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/mirror/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/mirror-client/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/json/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/basic-auth-extension/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> [2019-11-19 15:42:23,277] WARN The configuration 'admin.retries' was supplied 
> but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
> [2019-11-19 15:42:23,278] WARN The configuration 'admin.retry.backoff.ms' was 
> supplied but isn't a known config. 
> (org.apache.kafka.clients.consumer.ConsumerConfig)
> [2019-11-19 15:42:24,278] ERROR stream-client 
> [streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48] All stream threads 
> have died. The instance will be in error state and should be closed. 
> (org.apach e.kafka.streams.KafkaStreams)
> Exception in thread 
> "streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48-StreamThread-1" 
> java.lang.UnsatisfiedLinkError: /tmp/librocksdbjni1377754636857652484.so: 
> /tmp/librocksdbjni13777546368576524 84.so: 
> cannot open shared object file: No such file or directory (Possible cause: 
> can't load AMD 64-bit .so on a AARCH64-bit platform)
> {code}
>  *Analyze:*
> This issue is caused by rocksdbjni-5.18.3.jar which doesn't come with aarch64 
> native support. Replace rocksdbjni-5.18.3.jar with rocksdbjni-6.3.6.jar from 
> [https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni/6.3.6] can fix 
> this problem.
> Attached is the binary compatibility report of rocksdbjni.jar between 5.18.3 
> and 6.3.6. The result is 81.8%. So is it possible to upgrade rocksdbjni to 
> 6.3.6 in upstream? Should there be any kind of tests to execute, please 
> kindly point me. Thanks a lot.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key

2019-11-26 Thread Alessandro Tagliapietra (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16983045#comment-16983045
 ] 

Alessandro Tagliapietra commented on KAFKA-8769:


Is it a problem if a key "stream time" never advance? If there isn't any new 
data I think that's the expected behavior, also if the store changelog topic 
has compact,delete as cleanup policy the user can decide for how long keep an 
unused key stream time. We've currently implemented it that way and it seems to 
be working fine.

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. This is the opposite issue, where a partition 
> doesn't get enough traffic to advance stream time and results remain "stuck" 
> in the suppression buffers. We can provide some mechanism to force the 
> advancement of time across all partitions, for use in testing when you want 
> to flush out all results, or in production when some topic is low volume. We 
> shouldn't consider tracking time _more_ granularly until this problem is 
> solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key

2019-11-26 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16983044#comment-16983044
 ] 

Matthias J. Sax commented on KAFKA-8769:


Thanks for the details.

While there might be corner cases when a key-based tracking might seem an 
advantage, I am not sure if it is a good general solution. Assume that your 
key-space changes over time: for this case, some keys might be stuck forever as 
their "stream time" does not advance any longer.

Having a "switch" to turn it off/on would complicate the code base further... 
If more people run into the same problem and it's really a general issue, we 
can of course consider to do it.

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. This is the opposite issue, where a partition 
> doesn't get enough traffic to advance stream time and results remain "stuck" 
> in the suppression buffers. We can provide some mechanism to force the 
> advancement of time across all partitions, for use in testing when you want 
> to flush out all results, or in production when some topic is low volume. We 
> shouldn't consider tracking time _more_ granularly until this problem is 
> solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7987) a broker's ZK session may die on transient auth failure

2019-11-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16983015#comment-16983015
 ] 

ASF GitHub Bot commented on KAFKA-7987:
---

parafiend commented on pull request #7751: KAFKA-7987: Reinitialize 
ZookeeperClient after auth failures
URL: https://github.com/apache/kafka/pull/7751
 
 
   Schedule a client reinitialization after encountering a Zookeeper auth
   failure. This allows for reconnections when transient network errors are
   encountered during connection establishment.
   
   The Zookeeper client doesn't expose details of the auth failure so we
   can't determine whether an error is retriable or not, so all auth
   failures are retried.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
   Tested in a local docker environment by using iptables to block 
communication with ZK and Kerberos nodes. With this change, after auth failure 
due to Kerberos timeout, client continued retrying until communication was 
reopened and connection successfully re-established.
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> a broker's ZK session may die on transient auth failure
> ---
>
> Key: KAFKA-7987
> URL: https://issues.apache.org/jira/browse/KAFKA-7987
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Priority: Major
>
> After a transient network issue, we saw the following log in a broker.
> {code:java}
> [23:37:02,102] ERROR SASL authentication with Zookeeper Quorum member failed: 
> javax.security.sasl.SaslException: An error: 
> (java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
> GSS initiate failed [Caused by GSSException: No valid credentials provided 
> (Mechanism level: Server not found in Kerberos database (7))]) occurred when 
> evaluating Zookeeper Quorum Member's received SASL token. Zookeeper Client 
> will go to AUTH_FAILED state. (org.apache.zookeeper.ClientCnxn)
> [23:37:02,102] ERROR [ZooKeeperClient] Auth failed. 
> (kafka.zookeeper.ZooKeeperClient)
> {code}
> The network issue prevented the broker from communicating to ZK. The broker's 
> ZK session then expired, but the broker didn't know that yet since it 
> couldn't establish a connection to ZK. When the network was back, the broker 
> tried to establish a connection to ZK, but failed due to auth failure (likely 
> due to a transient KDC issue). The current logic just ignores the auth 
> failure without trying to create a new ZK session. Then the broker will be 
> permanently in a state that it's alive, but not registered in ZK.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9233) Kafka consumer throws undocumented IllegalStateException

2019-11-26 Thread Andrew Olson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16983002#comment-16983002
 ] 

Andrew Olson commented on KAFKA-9233:
-

Set priority to minor since it is easily worked around by using a Set instead 
of a List or otherwise being smarter about how the collection of TopicPartition 
values is gathered.

> Kafka consumer throws undocumented IllegalStateException
> 
>
> Key: KAFKA-9233
> URL: https://issues.apache.org/jira/browse/KAFKA-9233
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.3.0
>Reporter: Andrew Olson
>Priority: Minor
>
> If the provided collection of TopicPartition instances contains any 
> duplicates, an IllegalStateException not documented in the javadoc is thrown 
> by internal Java stream code when calling KafkaConsumer#beginningOffsets or 
> KafkaConsumer#endOffsets.
> The stack trace looks like this,
> {noformat}
> java.lang.IllegalStateException: Duplicate key -2
>   at 
> java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
>   at java.util.HashMap.merge(HashMap.java:1254)
>   at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
>   at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:555)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOffsets(Fetcher.java:542)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2054)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2031)
> {noformat}
> {noformat}
> java.lang.IllegalStateException: Duplicate key -1
>   at 
> java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
>   at java.util.HashMap.merge(HashMap.java:1254)
>   at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
>   at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:559)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.endOffsets(Fetcher.java:550)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2109)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2081)
> {noformat}
> Looking at the code, it appears this may likely have been introduced by 
> KAFKA-7831. The exception is not thrown in Kafka 2.2.1, with the duplicated 
> TopicPartition values silently ignored. Either we should document this 
> exception possibility (probably wrapping it with a Kafka exception class) 
> indicating invalid client API usage, or restore the previous behavior where 
> the duplicates were harmless.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9233) Kafka consumer throws undocumented IllegalStateException

2019-11-26 Thread Andrew Olson (Jira)
Andrew Olson created KAFKA-9233:
---

 Summary: Kafka consumer throws undocumented IllegalStateException
 Key: KAFKA-9233
 URL: https://issues.apache.org/jira/browse/KAFKA-9233
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 2.3.0
Reporter: Andrew Olson


If the provided collection of TopicPartition instances contains any duplicates, 
an IllegalStateException not documented in the javadoc is thrown by internal 
Java stream code when calling KafkaConsumer#beginningOffsets or 
KafkaConsumer#endOffsets.

The stack trace looks like this,

{noformat}
java.lang.IllegalStateException: Duplicate key -2
at 
java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
at java.util.HashMap.merge(HashMap.java:1254)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:555)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.beginningOffsets(Fetcher.java:542)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2054)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2031)
{noformat}

{noformat}
java.lang.IllegalStateException: Duplicate key -1
at 
java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
at java.util.HashMap.merge(HashMap.java:1254)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:559)
at 
org.apache.kafka.clients.consumer.internals.Fetcher.endOffsets(Fetcher.java:550)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2109)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2081)
{noformat}

Looking at the code, it appears this may likely have been introduced by 
KAFKA-7831. The exception is not thrown in Kafka 2.2.1, with the duplicated 
TopicPartition values silently ignored. Either we should document this 
exception possibility (probably wrapping it with a Kafka exception class) 
indicating invalid client API usage, or restore the previous behavior where the 
duplicates were harmless.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9220) TimeoutException when using kafka-preferred-replica-election

2019-11-26 Thread sats (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16982813#comment-16982813
 ] 

sats commented on KAFKA-9220:
-

Hi [~huxi_2b]

{{}}

I think as per the  
[doc|[https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-Howtousethetool?.6]],
 the tool supposed to take --timeout as a param, but in the code, its hardcoded 
initialization.

def main(args: Array[String]): Unit = {
 val timeout = 3
 run(args, timeout)
}

So, do we need KIP for this ? i think pull request with the fix should be 
sufficient, let me know so that i can create a pull request on the same.

 

> TimeoutException when using kafka-preferred-replica-election
> 
>
> Key: KAFKA-9220
> URL: https://issues.apache.org/jira/browse/KAFKA-9220
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.3.0
>Reporter: Or Shemesh
>Priority: Major
>
> When running kafka-preferred-replica-election --bootstrap-server xxx:9092
> I'm getting this error:
> Timeout waiting for election resultsTimeout waiting for election 
> resultsException in thread "main" kafka.common.AdminCommandFailedException at 
> kafka.admin.PreferredReplicaLeaderElectionCommand$AdminClientCommand.electPreferredLeaders(PreferredReplicaLeaderElectionCommand.scala:246)
>  at 
> kafka.admin.PreferredReplicaLeaderElectionCommand$.run(PreferredReplicaLeaderElectionCommand.scala:78)
>  at 
> kafka.admin.PreferredReplicaLeaderElectionCommand$.main(PreferredReplicaLeaderElectionCommand.scala:42)
>  at 
> kafka.admin.PreferredReplicaLeaderElectionCommand.main(PreferredReplicaLeaderElectionCommand.scala)Caused
>  by: org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
>  
> Because we have a big cluster and getting all the data from the zookeeper is 
> taking more the 30 second.
>  
> After searching the code I saw that the 30 second is hard-coded can you 
> enable us to set the timeout as parameter?
> [https://github.com/confluentinc/kafka/blob/master/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9220) TimeoutException when using kafka-preferred-replica-election

2019-11-26 Thread sats (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16982813#comment-16982813
 ] 

sats edited comment on KAFKA-9220 at 11/26/19 7:57 PM:
---

Hi [~huxi_2b]

I think as per the  [[doc||#Replicationtools-Howtousethetool?.6] 
[https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-Howtousethetool?.6]
 []|#Replicationtools-Howtousethetool?.6], the tool supposed to take --timeout 
as a param, but in the code, its hardcoded initialization.

def main(args: Array[String]): Unit =

{ val timeout = 3 run(args, timeout) }

So, do we need KIP for this ? i think pull request with the fix should be 
sufficient, let me know so that i can create a pull request on the same.

 


was (Author: sbellapu):
Hi [~huxi_2b]

I think as per the  [doc|#Replicationtools-Howtousethetool?.6]], the tool 
supposed to take --timeout as a param, but in the code, its hardcoded 
initialization.

def main(args: Array[String]): Unit =

{ val timeout = 3 run(args, timeout) }

So, do we need KIP for this ? i think pull request with the fix should be 
sufficient, let me know so that i can create a pull request on the same.

 

> TimeoutException when using kafka-preferred-replica-election
> 
>
> Key: KAFKA-9220
> URL: https://issues.apache.org/jira/browse/KAFKA-9220
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.3.0
>Reporter: Or Shemesh
>Priority: Major
>
> When running kafka-preferred-replica-election --bootstrap-server xxx:9092
> I'm getting this error:
> Timeout waiting for election resultsTimeout waiting for election 
> resultsException in thread "main" kafka.common.AdminCommandFailedException at 
> kafka.admin.PreferredReplicaLeaderElectionCommand$AdminClientCommand.electPreferredLeaders(PreferredReplicaLeaderElectionCommand.scala:246)
>  at 
> kafka.admin.PreferredReplicaLeaderElectionCommand$.run(PreferredReplicaLeaderElectionCommand.scala:78)
>  at 
> kafka.admin.PreferredReplicaLeaderElectionCommand$.main(PreferredReplicaLeaderElectionCommand.scala:42)
>  at 
> kafka.admin.PreferredReplicaLeaderElectionCommand.main(PreferredReplicaLeaderElectionCommand.scala)Caused
>  by: org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
>  
> Because we have a big cluster and getting all the data from the zookeeper is 
> taking more the 30 second.
>  
> After searching the code I saw that the 30 second is hard-coded can you 
> enable us to set the timeout as parameter?
> [https://github.com/confluentinc/kafka/blob/master/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9220) TimeoutException when using kafka-preferred-replica-election

2019-11-26 Thread sats (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9220?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16982813#comment-16982813
 ] 

sats edited comment on KAFKA-9220 at 11/26/19 7:56 PM:
---

Hi [~huxi_2b]

I think as per the  [doc|#Replicationtools-Howtousethetool?.6]], the tool 
supposed to take --timeout as a param, but in the code, its hardcoded 
initialization.

def main(args: Array[String]): Unit =

{ val timeout = 3 run(args, timeout) }

So, do we need KIP for this ? i think pull request with the fix should be 
sufficient, let me know so that i can create a pull request on the same.

 


was (Author: sbellapu):
Hi [~huxi_2b]

{{}}

I think as per the  
[doc|[https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools#Replicationtools-Howtousethetool?.6]],
 the tool supposed to take --timeout as a param, but in the code, its hardcoded 
initialization.

def main(args: Array[String]): Unit = {
 val timeout = 3
 run(args, timeout)
}

So, do we need KIP for this ? i think pull request with the fix should be 
sufficient, let me know so that i can create a pull request on the same.

 

> TimeoutException when using kafka-preferred-replica-election
> 
>
> Key: KAFKA-9220
> URL: https://issues.apache.org/jira/browse/KAFKA-9220
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 2.3.0
>Reporter: Or Shemesh
>Priority: Major
>
> When running kafka-preferred-replica-election --bootstrap-server xxx:9092
> I'm getting this error:
> Timeout waiting for election resultsTimeout waiting for election 
> resultsException in thread "main" kafka.common.AdminCommandFailedException at 
> kafka.admin.PreferredReplicaLeaderElectionCommand$AdminClientCommand.electPreferredLeaders(PreferredReplicaLeaderElectionCommand.scala:246)
>  at 
> kafka.admin.PreferredReplicaLeaderElectionCommand$.run(PreferredReplicaLeaderElectionCommand.scala:78)
>  at 
> kafka.admin.PreferredReplicaLeaderElectionCommand$.main(PreferredReplicaLeaderElectionCommand.scala:42)
>  at 
> kafka.admin.PreferredReplicaLeaderElectionCommand.main(PreferredReplicaLeaderElectionCommand.scala)Caused
>  by: org.apache.kafka.common.errors.TimeoutException: Aborted due to timeout.
>  
> Because we have a big cluster and getting all the data from the zookeeper is 
> taking more the 30 second.
>  
> After searching the code I saw that the 30 second is hard-coded can you 
> enable us to set the timeout as parameter?
> [https://github.com/confluentinc/kafka/blob/master/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9232) Coordinator new member heartbeat completion does not work for JoinGroup v3

2019-11-26 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-9232:
---
Summary: Coordinator new member heartbeat completion does not work for 
JoinGroup v3  (was: Coordinator heartbeat completion does not work for 
JoinGroup v3)

> Coordinator new member heartbeat completion does not work for JoinGroup v3
> --
>
> Key: KAFKA-9232
> URL: https://issues.apache.org/jira/browse/KAFKA-9232
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> For older versions of the JoinGroup API, the coordinator implements a static 
> timeout for new members of 5 minutes. This timeout is implemented using the 
> heartbeat purgatory and we expect that the delayed operation will be force 
> completed if the member successfully joins. This is implemented in 
> GroupCoordinator with the following logic:
> {code:scala}
> group.maybeInvokeJoinCallback(member, joinResult)
> completeAndScheduleNextHeartbeatExpiration(group, member)
> member.isNew = false
> {code}
> However, heartbeat completion depends on this check:
> {code:scala}
>   def shouldKeepAlive(deadlineMs: Long): Boolean = {
> if (isAwaitingJoin)
>   !isNew || latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > 
> deadlineMs
> else awaitingSyncCallback != null ||
>   latestHeartbeat + sessionTimeoutMs > deadlineMs
>   }
> {code}
> Since we invoke the join callback first, we will fall to the second branch. 
> This will only return true when the latest heartbeat plus session timeout 
> exceeds the deadline. The deadline in this case depends only on the 
> statically configured new member timeout, which means the heartbeat cannot 
> complete until about 5 minutes have passed. If the member falls out of the 
> group before then, then the heartbeat ultimately expires, which may trigger a 
> spurious rebalance.
> Newer versions of the protocol are not affected by this bug because we return 
> immediately the first time a member joins the group.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9232) Coordinator heartbeat completion does not work for JoinGroup v3

2019-11-26 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-9232:
--

 Summary: Coordinator heartbeat completion does not work for 
JoinGroup v3
 Key: KAFKA-9232
 URL: https://issues.apache.org/jira/browse/KAFKA-9232
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Sophie Blee-Goldman


For older versions of the JoinGroup API, the coordinator implements a static 
timeout for new members of 5 minutes. This timeout is implemented using the 
heartbeat purgatory and we expect that the delayed operation will be force 
completed if the member successfully joins. This is implemented in 
GroupCoordinator with the following logic:

{code:scala}
group.maybeInvokeJoinCallback(member, joinResult)
completeAndScheduleNextHeartbeatExpiration(group, member)
member.isNew = false
{code}

However, heartbeat completion depends on this check:

{code:scala}
  def shouldKeepAlive(deadlineMs: Long): Boolean = {
if (isAwaitingJoin)
  !isNew || latestHeartbeat + GroupCoordinator.NewMemberJoinTimeoutMs > 
deadlineMs
else awaitingSyncCallback != null ||
  latestHeartbeat + sessionTimeoutMs > deadlineMs
  }
{code}
Since we invoke the join callback first, we will fall to the second branch. 
This will only return true when the latest heartbeat plus session timeout 
exceeds the deadline. The deadline in this case depends only on the statically 
configured new member timeout, which means the heartbeat cannot complete until 
about 5 minutes have passed. If the member falls out of the group before then, 
then the heartbeat ultimately expires, which may trigger a spurious rebalance.

Newer versions of the protocol are not affected by this bug because we return 
immediately the first time a member joins the group.






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9229) initTransactions() fails when running new producer against an old server

2019-11-26 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-9229.

Resolution: Fixed

> initTransactions() fails when running new producer against an old server
> 
>
> Key: KAFKA-9229
> URL: https://issues.apache.org/jira/browse/KAFKA-9229
> Project: Kafka
>  Issue Type: Bug
>Reporter: Steven Zhang
>Assignee: Colin McCabe
>Priority: Major
>
> When using transactions to produce to a topic, I'm encountering this error 
> when I try to initTransactions()
> {code:java}
> [2019-11-22 09:32:39,076] ERROR Closing socket for 
> 10.200.24.250:9092-10.200.24.250:55885-63 because of error 
> (kafka.network.Processor)org.apache.kafka.common.errors.InvalidRequestException:
>  Error getting request for apiKey: INIT_PRODUCER_ID, apiVersion: 2, 
> connectionId: 10.200.24.250:9092-10.200.24.250:55885-63, listenerName: 
> ListenerName(PLAINTEXT), principal: User:ANONYMOUSCaused by: 
> org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
> '_tagged_fields': Varint is too long, the most significant bit in the 5th 
> byte is set, converted value: at 
> org.apache.kafka.common.protocol.types.Schema.read(Schema.java:110)at 
> org.apache.kafka.common.protocol.ApiKeys.parseRequest(ApiKeys.java:340)at 
> org.apache.kafka.common.requests.RequestContext.parseRequest(RequestContext.java:65)
> at kafka.network.RequestChannel$Request.(RequestChannel.scala:90)   
>  at 
> kafka.network.Processor.$anonfun$processCompletedReceives$1(SocketServer.scala:931)
> at 
> kafka.network.Processor.$anonfun$processCompletedReceives$1$adapted(SocketServer.scala:914)
> at scala.collection.Iterator.foreach(Iterator.scala:941)at 
> scala.collection.Iterator.foreach$(Iterator.scala:941)at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1429)at 
> scala.collection.IterableLike.foreach(IterableLike.scala:74)at 
> scala.collection.IterableLike.foreach$(IterableLike.scala:73)at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:56)at 
> kafka.network.Processor.processCompletedReceives(SocketServer.scala:914)
> at kafka.network.Processor.run(SocketServer.scala:804)at 
> java.lang.Thread.run(Thread.java:748)
> {code}
>  
> The producer is from the Kafka master branch while the Kafka server is 
> running from the AK 2.4 branch.
> The cause for this appears to be this commit. It should've been modifying 
> version 3 but version 2 (an already existing version) was modified instead.
> [https://github.com/apache/kafka/commit/fecb977b257888e2022c1b1e04dd7bf03e18720c]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8855) Collect and Expose Client's Name and Version in the Brokers

2019-11-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16982638#comment-16982638
 ] 

ASF GitHub Bot commented on KAFKA-8855:
---

dajac commented on pull request #7749: KAFKA-8855; Collect and Expose Client's 
Name and Version in the Brokers (KIP-511 Part 2)
URL: https://github.com/apache/kafka/pull/7749
 
 
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Collect and Expose Client's Name and Version in the Brokers
> ---
>
> Key: KAFKA-8855
> URL: https://issues.apache.org/jira/browse/KAFKA-8855
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>
> Implements KIP-511 as documented here: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-511%3A+Collect+and+Expose+Client%27s+Name+and+Version+in+the+Brokers]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9231) ProducerFencedException may cause Streams Thread to die

2019-11-26 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16982634#comment-16982634
 ] 

ASF GitHub Bot commented on KAFKA-9231:
---

vvcephei commented on pull request #7748: KAFKA-9231: Streams: Rebalance when 
fenced in suspend
URL: https://github.com/apache/kafka/pull/7748
 
 
   We missed a branch in which we might catch a ProducerFencedException. It 
should always be converted to a TaskMigratedException.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ProducerFencedException may cause Streams Thread to die
> ---
>
> Key: KAFKA-9231
> URL: https://issues.apache.org/jira/browse/KAFKA-9231
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> While testing Streams in EOS mode under frequent and heavy network 
> partitions, I've encountered the following error, leading to thread death:
> {noformat}
> [2019-11-26 04:54:02,650] ERROR 
> [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] 
> stream-thread 
> [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] 
> Encountered the following unexpected Kafka exception during processing, this 
> usually indicate Streams internal errors: 
> (org.apache.kafka.streams.processor.internals.StreamThread)
> org.apache.kafka.streams.errors.StreamsException: stream-thread 
> [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] Failed 
> to rebalance.
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:852)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:739)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
> Caused by: org.apache.kafka.streams.errors.StreamsException: stream-thread 
> [stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] failed 
> to suspend stream tasks
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.suspendActiveTasksAndState(TaskManager.java:253)
>   at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsRevoked(StreamsRebalanceListener.java:116)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:291)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:707)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe(KafkaConsumer.java:1073)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.enforceRebalance(StreamThread.java:716)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:710)
>   ... 1 more
> Caused by: org.apache.kafka.streams.errors.ProcessorStateException: task 
> [1_1] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-07
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:279)
>   at 
> org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:175)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:581)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:535)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:660)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:628)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.suspendRunningTasks(AssignedStreamsTasks.java:145)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.suspendOrCloseTasks(AssignedStreamsTasks.java:128)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.suspendActiveTasksAndState(TaskManager.java:246)
>   ... 7 more
> Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
> attempted an operation with an old epoch. Either there is a newer producer 
> with 

[jira] [Created] (KAFKA-9231) ProducerFencedException may cause Streams Thread to die

2019-11-26 Thread John Roesler (Jira)
John Roesler created KAFKA-9231:
---

 Summary: ProducerFencedException may cause Streams Thread to die
 Key: KAFKA-9231
 URL: https://issues.apache.org/jira/browse/KAFKA-9231
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.4.0
Reporter: John Roesler
Assignee: John Roesler


While testing Streams in EOS mode under frequent and heavy network partitions, 
I've encountered the following error, leading to thread death:

{noformat}
[2019-11-26 04:54:02,650] ERROR 
[stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] 
stream-thread 
[stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] 
Encountered the following unexpected Kafka exception during processing, this 
usually indicate Streams internal errors: 
(org.apache.kafka.streams.processor.internals.StreamThread)
org.apache.kafka.streams.errors.StreamsException: stream-thread 
[stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] Failed 
to rebalance.
at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:852)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:739)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:698)
at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:671)
Caused by: org.apache.kafka.streams.errors.StreamsException: stream-thread 
[stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] failed 
to suspend stream tasks
at 
org.apache.kafka.streams.processor.internals.TaskManager.suspendActiveTasksAndState(TaskManager.java:253)
at 
org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsRevoked(StreamsRebalanceListener.java:116)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsRevoked(ConsumerCoordinator.java:291)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onLeavePrepare(ConsumerCoordinator.java:707)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.unsubscribe(KafkaConsumer.java:1073)
at 
org.apache.kafka.streams.processor.internals.StreamThread.enforceRebalance(StreamThread.java:716)
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:710)
... 1 more
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: task [1_1] 
Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-07
at 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:279)
at 
org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:175)
at 
org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:581)
at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:535)
at 
org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:660)
at 
org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:628)
at 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.suspendRunningTasks(AssignedStreamsTasks.java:145)
at 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.suspendOrCloseTasks(AssignedStreamsTasks.java:128)
at 
org.apache.kafka.streams.processor.internals.TaskManager.suspendActiveTasksAndState(TaskManager.java:246)
... 7 more
Caused by: org.apache.kafka.common.errors.ProducerFencedException: Producer 
attempted an operation with an old epoch. Either there is a newer producer with 
the same transactionalId, or the producer's transaction has been expired by the 
broker.
[2019-11-26 04:54:02,650] INFO 
[stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] 
stream-thread 
[stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] State 
transition from PARTITIONS_REVOKED to PENDING_SHUTDOWN 
(org.apache.kafka.streams.processor.internals.StreamThread)
[2019-11-26 04:54:02,650] INFO 
[stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] 
stream-thread 
[stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] Shutting 
down (org.apache.kafka.streams.processor.internals.StreamThread)
[2019-11-26 04:54:02,650] INFO 
[stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] 
[Consumer 
clientId=stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2-restore-consumer,
 groupId=null] Unsubscribed all topics or patterns and assigned partitions 
(org.apache.kafka.clients.consumer.KafkaConsumer)
[2019-11-26 04:54:02,653] INFO 
[stream-soak-test-4290196e-d805-4acd-9f78-b459cc7e99ee-StreamThread-2] 
stream-thread 

[jira] [Commented] (KAFKA-8820) Use Admin API of Replica Reassignment in CLI tools

2019-11-26 Thread Viktor Somogyi-Vass (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16982488#comment-16982488
 ] 

Viktor Somogyi-Vass commented on KAFKA-8820:


Hi [~steverod], I had a chat with [~enether] a while back and he said that you 
might not have the bandwidth currently to continue with this. Do you mind if I 
pick it up or are you currently working on this?

> Use Admin API of Replica Reassignment in CLI tools
> --
>
> Key: KAFKA-8820
> URL: https://issues.apache.org/jira/browse/KAFKA-8820
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Gwen Shapira
>Assignee: Steve Rodrigues
>Priority: Major
>
> KIP-455 and KAFKA-8345 add a protocol and AdminAPI that will be used for 
> replica reassignments. We need to update the reassignment tool to use this 
> new API rather than work with ZK directly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9212) Keep receiving FENCED_LEADER_EPOCH while sending ListOffsetRequest

2019-11-26 Thread Yannick (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16982412#comment-16982412
 ] 

Yannick commented on KAFKA-9212:


Yeah the topic is correctly replicated according metadata output from tools 
like kafkacat :

 

As of today, we downgrade our clients to 2.2.1 to avoid being stuck in this 
fencing loop ( 2.3 client handle the FENCED_LEADER_EPOCH ).

We restarted the 3 brokers ( rolling restart) and still have discrepancies 
between those checkpoint files as follows :

 

Broker ID 4 :

cat /var/lib/kafka/logs/connect_ls_config-0/leader-epoch-checkpoint
0
2
0 0
6 22

 

Broker ID 1 :

cat /var/lib/kafka/logs/connect_ls_config-0/leader-epoch-checkpoint
0
2
0 0
5 22

 

Broker ID 3:

cat /var/lib/kafka/logs/connect_ls_config-0/leader-epoch-checkpoint
0
1
0 0

 

 

Regarding the dump of this topic, here they are ( there is just one .log file . 
for all brokers) ( cannot show the content using print-data-log as it might 
contain sensitive info) :

 

Broker ID 1 :

/opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files 
/var/lib/kafka/logs/connect_ls_config-0/.log
Dumping /var/lib/kafka/logs/connect_ls_config-0/.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false 
isControl: false position: 0 CreateTime: 1573660711038 size: 962 magic: 2 
compresscodec: NONE crc: 1786879997 isvalid: true
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false 
isControl: false position: 962 CreateTime: 1573660712089 size: 1009 magic: 2 
compresscodec: NONE crc: 1230182444 isvalid: true
baseOffset: 2 lastOffset: 3 count: 2 baseSequence: -1 lastSequence: -1 
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false 
isControl: false position: 1971 CreateTime: 1573660712091 size: 1957 magic: 2 
compresscodec: NONE crc: 2419651795 isvalid: true
baseOffset: 4 lastOffset: 4 count: 1 baseSequence: -1 lastSequence: -1 
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false 
isControl: false position: 3928 CreateTime: 1573660712611 size: 89 magic: 2 
compresscodec: NONE crc: 3321423372 isvalid: true
baseOffset: 5 lastOffset: 5 count: 1 baseSequence: -1 lastSequence: -1 
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false 
isControl: false position: 4017 CreateTime: 1573751698440 size: 962 magic: 2 
compresscodec: NONE crc: 704355531 isvalid: true
baseOffset: 6 lastOffset: 6 count: 1 baseSequence: -1 lastSequence: -1 
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false 
isControl: false position: 4979 CreateTime: 1573751699462 size: 1009 magic: 2 
compresscodec: NONE crc: 1489459952 isvalid: true
baseOffset: 7 lastOffset: 8 count: 2 baseSequence: -1 lastSequence: -1 
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false 
isControl: false position: 5988 CreateTime: 1573751699463 size: 1957 magic: 2 
compresscodec: NONE crc: 657348671 isvalid: true
baseOffset: 9 lastOffset: 9 count: 1 baseSequence: -1 lastSequence: -1 
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false 
isControl: false position: 7945 CreateTime: 1573751699985 size: 89 magic: 2 
compresscodec: NONE crc: 1825092385 isvalid: true
baseOffset: 10 lastOffset: 11 count: 2 baseSequence: -1 lastSequence: -1 
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false 
isControl: false position: 8034 CreateTime: 1573828311242 size: 104 magic: 2 
compresscodec: NONE crc: 3533917687 isvalid: true
baseOffset: 12 lastOffset: 12 count: 1 baseSequence: -1 lastSequence: -1 
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false 
isControl: false position: 8138 CreateTime: 1573828467292 size: 953 magic: 2 
compresscodec: NONE crc: 232359935 isvalid: true
baseOffset: 13 lastOffset: 13 count: 1 baseSequence: -1 lastSequence: -1 
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false 
isControl: false position: 9091 CreateTime: 1573828467807 size: 1000 magic: 2 
compresscodec: NONE crc: 1484213287 isvalid: true
baseOffset: 14 lastOffset: 15 count: 2 baseSequence: -1 lastSequence: -1 
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false 
isControl: false position: 10091 CreateTime: 1573828467808 size: 1939 magic: 2 
compresscodec: NONE crc: 49865436 isvalid: true
baseOffset: 16 lastOffset: 16 count: 1 baseSequence: -1 lastSequence: -1 
producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false 
isControl: false position: 12030 CreateTime: 1573828468331 size: 94 magic: 2 
compresscodec: NONE crc: 1480833250 isvalid: true
baseOffset: 17 lastOffset: 

[jira] [Assigned] (KAFKA-9230) Change User Customizable Metrics API in StreamsMetrics interface

2019-11-26 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-9230:


Assignee: Bruno Cadonna

> Change User Customizable Metrics API in StreamsMetrics interface
> 
>
> Key: KAFKA-9230
> URL: https://issues.apache.org/jira/browse/KAFKA-9230
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
>
> As proposed in KIP-444, the user-customizable metrics API in the 
> StreamsMetrics interface shall be improved. For more details, see 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9230) Change User Customizable Metrics API in StreamsMetrics interface

2019-11-26 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-9230:


 Summary: Change User Customizable Metrics API in StreamsMetrics 
interface
 Key: KAFKA-9230
 URL: https://issues.apache.org/jira/browse/KAFKA-9230
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Bruno Cadonna


As proposed in KIP-444, the user-customizable metrics API in the StreamsMetrics 
interface shall be improved. For more details, see 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8769) Consider computing stream time independently per key

2019-11-26 Thread Dmitry Sergeev (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8769?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16982294#comment-16982294
 ] 

Dmitry Sergeev commented on KAFKA-8769:
---

We were using the built-in "suppress" operator because of the need to have a 
single final result for each aggregation window. Also, we need a calculation 
time that is as much close to the real-time as possible. Increasing grace time 
will add a delay to computation, which is against our requirements.

After some period of time, we discovered bugs in our system because of the 
stream time being tracked for a partition as opposed to a key. The current 
solution we made is a big grace time + switch built-in "suppress" operator with 
a custom one that provides "suppress" semantics but tracks stream time per each 
record key.

Maybe there's a more elegant way to handle our case, but having an ability to 
simply turn on the stream time tracking by key and use built-in operators like 
"suppress" sounds like what we need to minimize customizations.

> Consider computing stream time independently per key
> 
>
> Key: KAFKA-8769
> URL: https://issues.apache.org/jira/browse/KAFKA-8769
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>  Labels: needs-discussion, needs-kip
>
> Currently, Streams uses a concept of "stream time", which is computed as the 
> highest timestamp observed by stateful operators, per partition. This concept 
> of time backs grace period, retention time, and suppression.
> For use cases in which data is produced to topics in roughly chronological 
> order (as in db change capture), this reckoning is fine.
> Some use cases have a different pattern, though. For example, in IOT 
> applications, it's common for sensors to save up quite a bit of data and then 
> dump it all at once into the topic. See 
> https://stackoverflow.com/questions/57082923/kafka-windowed-stream-make-grace-and-suppress-key-aware
>  for a concrete example of the use case.
> I have heard of cases where each sensor dumps 24 hours' worth of data at a 
> time into the topic. This results in a pattern in which, when reading a 
> single partition, the operators observe a lot of consecutive records for one 
> key that increase in timestamp for 24 hours, then a bunch of consecutive 
> records for another key that are also increasing in timestamp over the same 
> 24 hour period. With our current stream-time definition, this means that the 
> partition's stream time increases while reading the first key's data, but 
> then stays paused while reading the second key's data, since the second batch 
> of records all have timestamps in the "past".
> E.g:
> {noformat}
> A@t0 (stream time: 0)
> A@t1 (stream time: 1)
> A@t2 (stream time: 2)
> A@t3 (stream time: 3)
> B@t0 (stream time: 3)
> B@t1 (stream time: 3)
> B@t2 (stream time: 3)
> B@t3 (stream time: 3)
> {noformat}
> This pattern results in an unfortunate compromise in which folks are required 
> to set the grace period to the max expected time skew, for example 24 hours, 
> or Streams will just drop the second key's data (since it is late). But, this 
> means that if they want to use Suppression for "final results", they have to 
> wait 24 hours for the result.
> This tradeoff is not strictly necessary, though, because each key represents 
> a logically independent sequence of events. Tracking by partition is simply 
> convenient, but typically not logically meaningful. That is, the partitions 
> are just physically independent sequences of events, so it's convenient to 
> track stream time at this granularity. It would be just as correct, and more 
> useful for IOT-like use cases, to track time independently for each key.
> However, before considering this change, we need to solve the 
> testing/low-traffic problem. This is the opposite issue, where a partition 
> doesn't get enough traffic to advance stream time and results remain "stuck" 
> in the suppression buffers. We can provide some mechanism to force the 
> advancement of time across all partitions, for use in testing when you want 
> to flush out all results, or in production when some topic is low volume. We 
> shouldn't consider tracking time _more_ granularly until this problem is 
> solved, since it would just make the low-traffic problem worse.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9064) Observing transient issue with kinit command

2019-11-26 Thread Pradeep Bansal (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9064?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pradeep Bansal updated KAFKA-9064:
--
Issue Type: Bug  (was: Improvement)

> Observing transient issue with kinit command
> 
>
> Key: KAFKA-9064
> URL: https://issues.apache.org/jira/browse/KAFKA-9064
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.1
>Reporter: Pradeep Bansal
>Priority: Major
>
> I have specified kinit command to be skinit. While this works fine for most 
> time, sometimes I see below exception where it doesnt respect provided kinit 
> command and use default value. Can this be handled?
>  
> |{{[}}{{2019}}{{-}}{{02}}{{-}}{{19}} 
> {{10}}{{:}}{{20}}{{:}}{{07}}{{,}}{{862}}{{] WARN [Principal}}{{=}}{{null]: 
> Could }}{{not}} {{renew TGT due to problem running shell command: 
> }}{{'/usr/bin/kinit -R'}}{{. Exiting refresh thread. 
> (org.apache.kafka.common.security.kerberos.KerberosLogin)}}
> {{org.apache.kafka.common.utils.Shell$ExitCodeException: kinit: Matching 
> credential }}{{not}} {{found (filename: 
> }}{{/}}{{tmp}}{{/}}{{krb5cc_25012_76850_sshd_w6VpLC8R0Y) }}{{while}} 
> {{renewing credentials}}
>  
> {{}}{{at 
> org.apache.kafka.common.utils.Shell.runCommand(Shell.java:}}{{130}}{{)}}
> {{}}{{at 
> org.apache.kafka.common.utils.Shell.run(Shell.java:}}{{76}}{{)}}
> {{}}{{at 
> org.apache.kafka.common.utils.Shell$ShellCommandExecutor.execute(Shell.java:}}{{204}}{{)}}
> {{}}{{at 
> org.apache.kafka.common.utils.Shell.execCommand(Shell.java:}}{{268}}{{)}}
> {{}}{{at 
> org.apache.kafka.common.utils.Shell.execCommand(Shell.java:}}{{255}}{{)}}
> {{}}{{at 
> org.apache.kafka.common.security.kerberos.KerberosLogin.}}{{lambda}}{{$login$}}{{10}}{{(KerberosLogin.java:}}{{212}}{{)}}
> {{}}{{at 
> java.base}}{{/}}{{java.lang.Thread.run(Thread.java:}}{{834}}{{)}}|
> | |
> | |



--
This message was sent by Atlassian Jira
(v8.3.4#803005)