[jira] [Commented] (KAFKA-9146) Add option to force delete members in stream reset tool

2019-12-16 Thread feyman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997961#comment-16997961
 ] 

feyman commented on KAFKA-9146:
---

[~bchen225242] , [~mjsax]  I have briefly went through the code in 
GroupCoordinator and StreamsResetter , but I'm not sure how to reproduce the 
current issue . Would you kindly advice how can I reproduce the issue? Thanks !

 

> Add option to force delete members in stream reset tool
> ---
>
> Key: KAFKA-9146
> URL: https://issues.apache.org/jira/browse/KAFKA-9146
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Boyang Chen
>Assignee: feyman
>Priority: Major
>  Labels: newbie
>
> Sometimes people want to reset the stream application sooner, but blocked by 
> the left-over members inside group coordinator, which only expire after 
> session timeout. When user configures a really long session timeout, it could 
> prevent the group from clearing. We should consider adding the support to 
> cleanup members by forcing them to leave the group. To do that, 
>  # If the stream application is already on static membership, we could call 
> directly from adminClient.removeMembersFromGroup
>  # If the application is on dynamic membership, we should modify 
> adminClient.removeMembersFromGroup interface to allow deletion based on 
> member.id.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9307) Transaction coordinator could be left in unknown state after ZK session timeout

2019-12-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997925#comment-16997925
 ] 

ASF GitHub Bot commented on KAFKA-9307:
---

dhruvilshah3 commented on pull request #7840: KAFKA-9307: Make transaction 
metadata loading resilient to previous errors
URL: https://github.com/apache/kafka/pull/7840
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Transaction coordinator could be left in unknown state after ZK session 
> timeout
> ---
>
> Key: KAFKA-9307
> URL: https://issues.apache.org/jira/browse/KAFKA-9307
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Major
>
> We observed a case where the transaction coordinator could not load 
> transaction state from __transaction-state topic partition. Clients would 
> continue seeing COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker 
> hosting the coordinator is restarted.
> This is the sequence of events that leads to the issue:
>  # The broker is the leader of one (or more) transaction state topic 
> partitions.
>  # The broker loses its ZK session due to a network issue.
>  # Broker reestablishes session with ZK, though there are still transient 
> network issues.
>  # Broker is made follower of the transaction state topic partition it was 
> leading earlier.
>  # During the become-follower transition, the broker loses its ZK session 
> again.
>  # The become-follower transition for this broker fails in-between, leaving 
> us in a partial leader / partial follower state for the transaction topic. 
> This meant that we could not unload the transaction metadata. However, the 
> broker successfully caches the leader epoch of associated with the 
> LeaderAndIsrRequest.
>  # Later, when the ZK session is finally established successfully, the broker 
> ignores the become-follower transition as the leader epoch was same as the 
> one it had cached. This prevented the transaction metadata from being 
> unloaded.
>  # Because this partition was a partial follower, we had setup replica 
> fetchers. The partition continued to fetch from the leader until it was made 
> part of the ISR.
>  # Once it was part of the ISR, preferred leader election kicked in and 
> elected this broker as the leader.
>  # When processing the become-leader transition, the transaction state load 
> operation failed as we already had transaction metadata loaded at a previous 
> epoch.
>  # This meant that this partition was left in the "loading" state and we thus 
> returned COORDINATOR_LOAD_IN_PROGRESS errors.
> Restarting the broker that hosts the transaction state coordinator is the 
> only way to recover from this situation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9225) kafka fail to run on linux-aarch64

2019-12-16 Thread jiamei xie (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997910#comment-16997910
 ] 

jiamei xie commented on KAFKA-9225:
---

I put an [issue |https://github.com/facebook/rocksdb/issues/6188]to rocksdb 
community. They might release a version that backports arm64 support related 
commits to 5.18.3

> kafka fail to run on linux-aarch64
> --
>
> Key: KAFKA-9225
> URL: https://issues.apache.org/jira/browse/KAFKA-9225
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: jiamei xie
>Priority: Blocker
>  Labels: incompatible
> Fix For: 3.0.0
>
> Attachments: compat_report.html
>
>
> *Steps to reproduce:*
> 1. Download Kafka latest source code
> 2. Build it with gradle
> 3. Run 
> [streamDemo|[https://kafka.apache.org/23/documentation/streams/quickstart]]
> when running bin/kafka-run-class.sh 
> org.apache.kafka.streams.examples.wordcount.WordCountDemo, it crashed with 
> the following error message
> {code:java}
> xjm@ubuntu-arm01:~/kafka$bin/kafka-run-class.sh 
> org.apache.kafka.streams.examples.wordcount.WordCountDemo
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/core/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/tools/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/api/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/transforms/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/file/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/mirror/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/mirror-client/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/json/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/basic-auth-extension/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> [2019-11-19 15:42:23,277] WARN The configuration 'admin.retries' was supplied 
> but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
> [2019-11-19 15:42:23,278] WARN The configuration 'admin.retry.backoff.ms' was 
> supplied but isn't a known config. 
> (org.apache.kafka.clients.consumer.ConsumerConfig)
> [2019-11-19 15:42:24,278] ERROR stream-client 
> [streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48] All stream threads 
> have died. The instance will be in error state and should be closed. 
> (org.apach e.kafka.streams.KafkaStreams)
> Exception in thread 
> "streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48-StreamThread-1" 
> java.lang.UnsatisfiedLinkError: /tmp/librocksdbjni1377754636857652484.so: 
> /tmp/librocksdbjni13777546368576524 84.so: 
> cannot open shared object file: No such file or directory (Possible cause: 
> can't load AMD 64-bit .so on a AARCH64-bit platform)
> {code}
>  *Analyze:*
> This issue is caused by rocksdbjni-5.18.3.jar which doesn't come with aarch64 
> native support. Replace rocksdbjni-5.18.3.jar with rocksdbjni-6.3.6.jar from 
> [https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni/6.3.6] can fix 
> this problem.
> Attached is the binary compatibility report of rocksdbjni.jar between 5.18.3 
> and 6.3.6. The result is 81.8%. So is it possible to upgrade rocksdbjni to 
> 6.3.6 in upstream? Should there be any kind of tests to execute, please 
> kindly point me. Thanks a lot.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9307) Transaction coordinator could be left in unknown state after ZK session timeout

2019-12-16 Thread Dhruvil Shah (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997882#comment-16997882
 ] 

Dhruvil Shah edited comment on KAFKA-9307 at 12/17/19 4:42 AM:
---

Exception during step 6 that led to partial completion of become-follower 
transition:
{code:java}
[2019-12-12 03:08:17,864] ERROR [KafkaApi-3] Error when handling request: 
clientId=2, correlationId=1, api=LEADER_AND_ISR,
...
{topic=__transaction_state,partition_states=[{...
{partition=41,controller_epoch=16,leader=4,leader_epoch=112,isr=[2,4,1],zk_version=208,replicas=[3,4,2,1],is_new=false}
...
org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = 
Session expired for /brokers/topics/__transaction_state
at org.apache.zookeeper.KeeperException.create(KeeperException.java:130)
at org.apache.zookeeper.KeeperException.create(KeeperException.java:54)
at 
kafka.zookeeper.AsyncResponse.resultException(ZooKeeperClient.scala:537)
at 
kafka.zk.KafkaZkClient$$anonfun$getReplicaAssignmentForTopics$1.apply(KafkaZkClient.scala:579)
at 
kafka.zk.KafkaZkClient$$anonfun$getReplicaAssignmentForTopics$1.apply(KafkaZkClient.scala:574)
at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at 
kafka.zk.KafkaZkClient.getReplicaAssignmentForTopics(KafkaZkClient.scala:574)
at 
kafka.zk.KafkaZkClient.getTopicPartitionCount(KafkaZkClient.scala:624)
at 
kafka.coordinator.transaction.TransactionStateManager.getTransactionTopicPartitionCount(TransactionStateManager.scala:279)
at 
kafka.coordinator.transaction.TransactionStateManager.validateTransactionTopicPartitionCountIsStable(TransactionStateManager.scala:465)
at 
kafka.coordinator.transaction.TransactionStateManager.removeTransactionsForTxnTopicPartition(TransactionStateManager.scala:434)
at 
kafka.coordinator.transaction.TransactionCoordinator.handleTxnEmigration(TransactionCoordinator.scala:282)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:190)
at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:186)
at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$onLeadershipChange$1(KafkaApis.scala:186)
at kafka.server.KafkaApis$$anonfun$2.apply(KafkaApis.scala:202)
at kafka.server.KafkaApis$$anonfun$2.apply(KafkaApis.scala:202)
at 
kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1153)
at 
kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:202) 
{code}


was (Author: dhruvilshah):
Exception during step 6 that led to partial completion of become-follower 
transition:
{code:java}
 2019-12-12 03:11:56,320] ERROR [Transaction State Manager 3]: The metadata 
cache for txn partition 41 has already exist with epoch 111 and 6 entries while 
trying to add to it; this should not happen 
(kafka.coordinator.transaction.TransactionStateManager) [2019-12-12 
03:11:56,320] ERROR Uncaught exception in scheduled task 
'load-txns-for-partition-__transaction_state-41' (kafka.utils.KafkaScheduler) 
java.lang.IllegalStateException: The metadata cache for txn partition 41 has 
already exist with epoch 111 and 6 entries while trying to add to it; this 
should not happen at 
kafka.coordinator.transaction.TransactionStateManager.addLoadedTransactionsToCache(TransactionStateManager.scala:369)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply$mcV$sp(TransactionStateManager.scala:394)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) at 
kafka.coordinator.transaction.TransactionStateManager.kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1(TransactionStateManager.scala:392)
 at 

[jira] [Commented] (KAFKA-9307) Transaction coordinator could be left in unknown state after ZK session timeout

2019-12-16 Thread Dhruvil Shah (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997885#comment-16997885
 ] 

Dhruvil Shah commented on KAFKA-9307:
-

Become-follower transition ignored during step 7:
{code:java}
[2019-12-12 03:08:18,241] TRACE [Broker id=3] Received LeaderAndIsr request 
PartitionState(controllerEpoch=16, leader=4, leaderEpoch=112, isr=2,4,1, 
zkVersion=208, replicas=3,4,2,1, isNew=false) correlation id 1 from controller 
2 epoch 16 for partition __transaction_state-41 (state.change.logger)
[2019-12-12 03:08:18,247] WARN [Broker id=3] Ignoring LeaderAndIsr request from 
controller 2 with correlation id 1 epoch 16 for partition 
__transaction_state-41 since its associated leader epoch 112 is not higher than 
the current leader epoch 112 (state.change.logger)
{code}

> Transaction coordinator could be left in unknown state after ZK session 
> timeout
> ---
>
> Key: KAFKA-9307
> URL: https://issues.apache.org/jira/browse/KAFKA-9307
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Major
>
> We observed a case where the transaction coordinator could not load 
> transaction state from __transaction-state topic partition. Clients would 
> continue seeing COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker 
> hosting the coordinator is restarted.
> This is the sequence of events that leads to the issue:
>  # The broker is the leader of one (or more) transaction state topic 
> partitions.
>  # The broker loses its ZK session due to a network issue.
>  # Broker reestablishes session with ZK, though there are still transient 
> network issues.
>  # Broker is made follower of the transaction state topic partition it was 
> leading earlier.
>  # During the become-follower transition, the broker loses its ZK session 
> again.
>  # The become-follower transition for this broker fails in-between, leaving 
> us in a partial leader / partial follower state for the transaction topic. 
> This meant that we could not unload the transaction metadata. However, the 
> broker successfully caches the leader epoch of associated with the 
> LeaderAndIsrRequest.
>  # Later, when the ZK session is finally established successfully, the broker 
> ignores the become-follower transition as the leader epoch was same as the 
> one it had cached. This prevented the transaction metadata from being 
> unloaded.
>  # Because this partition was a partial follower, we had setup replica 
> fetchers. The partition continued to fetch from the leader until it was made 
> part of the ISR.
>  # Once it was part of the ISR, preferred leader election kicked in and 
> elected this broker as the leader.
>  # When processing the become-leader transition, the transaction state load 
> operation failed as we already had transaction metadata loaded at a previous 
> epoch.
>  # This meant that this partition was left in the "loading" state and we thus 
> returned COORDINATOR_LOAD_IN_PROGRESS errors.
> Restarting the broker that hosts the transaction state coordinator is the 
> only way to recover from this situation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9307) Transaction coordinator could be left in unknown state after ZK session timeout

2019-12-16 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-9307:

Description: 
We observed a case where the transaction coordinator could not load transaction 
state from __transaction-state topic partition. Clients would continue seeing 
COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker hosting the 
coordinator is restarted.

This is the sequence of events that leads to the issue:
 # The broker is the leader of one (or more) transaction state topic partitions.
 # The broker loses its ZK session due to a network issue.
 # Broker reestablishes session with ZK, though there are still transient 
network issues.
 # Broker is made follower of the transaction state topic partition it was 
leading earlier.
 # During the become-follower transition, the broker loses its ZK session again.
 # The become-follower transition for this broker fails in-between, leaving us 
in a partial leader / partial follower state for the transaction topic. This 
meant that we could not unload the transaction metadata. However, the broker 
successfully caches the leader epoch of associated with the LeaderAndIsrRequest.
 # Later, when the ZK session is finally established successfully, the broker 
ignores the become-follower transition as the leader epoch was same as the one 
it had cached. This prevented the transaction metadata from being unloaded.
 # Because this partition was a partial follower, we had setup replica 
fetchers. The partition continued to fetch from the leader until it was made 
part of the ISR.
 # Once it was part of the ISR, preferred leader election kicked in and elected 
this broker as the leader.
 # When processing the become-leader transition, the transaction state load 
operation failed as we already had transaction metadata loaded at a previous 
epoch.
 # This meant that this partition was left in the "loading" state and we thus 
returned COORDINATOR_LOAD_IN_PROGRESS errors.

Restarting the broker that hosts the transaction state coordinator is the only 
way to recover from this situation.

  was:
We observed a case where the transaction coordinator could not load transaction 
state from __transaction-state topic partition. Clients would continue seeing 
COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker hosting the 
coordinator is restarted.

This is the sequence of events that leads to the issue:
 # The broker is the leader of one (or more) transaction state topic partitions.
 # The broker loses its ZK session due to a network issue.
 # Broker reestablishes session with ZK, though there are still transient 
network issues.
 # Broker is made follower of the transaction state topic partition it was 
leading earlier.
 # During the become-follower transition, the broker loses its ZK session again.
 # The become-follower transition for this broker fails in-between, leaving us 
in a partial leader / partial follower state for the transaction topic. This 
meant that we could not unload the transaction metadata. However, the broker 
successfully caches the leader epoch of associated with the LeaderAndIsrRequest.
 # Later, when the ZK session is finally established successfully, the broker 
ignores the become-follower transition as the leader epoch was same as the one 
it had cached. This prevented the transaction metadata from being unloaded.
 # Because this partition was a partial follower, we had setup replica 
fetchers. The partition continued to fetch from the leader until it was made 
part of the ISR.
 # Once it was part of the ISR, preferred leader election kicked in and elected 
this broker as the leader.
 # When processing the become-leader transition, the transaction state load 
operation failed as we already had transaction metadata loaded at a previous 
epoch.
 # This meant that this partition was left in the "loading" state and we thus 
returned COORDINATOR_LOAD_IN_PROGRESS errors.
 # Broker restart fixed this partial in-memory state and we were able to resume 
processing for transactions.


> Transaction coordinator could be left in unknown state after ZK session 
> timeout
> ---
>
> Key: KAFKA-9307
> URL: https://issues.apache.org/jira/browse/KAFKA-9307
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Major
>
> We observed a case where the transaction coordinator could not load 
> transaction state from __transaction-state topic partition. Clients would 
> continue seeing COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker 
> hosting the coordinator is restarted.
> This is the sequence of events that leads to the issue:
>  # The broker is the leader of one (or more) transaction state topic 
> partitions.
>  # The 

[jira] [Updated] (KAFKA-9307) Transaction coordinator could be left in unknown state after ZK session timeout

2019-12-16 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-9307:

Description: 
We observed a case where the transaction coordinator could not load transaction 
state from __transaction-state topic partition. Clients would continue seeing 
COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker hosting the 
coordinator is restarted.

This is the sequence of events that leads to the issue:
 # The broker is the leader of one (or more) transaction state topic partitions.
 # The broker loses its ZK session due to a network issue.
 # Broker reestablishes session with ZK, though there are still transient 
network issues.
 # Broker is made follower of the transaction state topic partition it was 
leading earlier.
 # During the become-follower transition, the broker loses its ZK session again.
 # The become-follower transition for this broker fails in-between, leaving us 
in a partial leader / partial follower state for the transaction topic. This 
meant that we could not unload the transaction metadata. However, the broker 
successfully caches the leader epoch of associated with the LeaderAndIsrRequest.
 # Later, when the ZK session is finally established successfully, the broker 
ignores the become-follower transition as the leader epoch was same as the one 
it had cached. This prevented the transaction metadata from being unloaded.
 # Because this partition was a partial follower, we had setup replica 
fetchers. The partition continued to fetch from the leader until it was made 
part of the ISR.
 # Once it was part of the ISR, preferred leader election kicked in and elected 
this broker as the leader.
 # When processing the become-leader transition, the transaction state load 
operation failed as we already had transaction metadata loaded at a previous 
epoch.
 # This meant that this partition was left in the "loading" state and we thus 
returned COORDINATOR_LOAD_IN_PROGRESS errors.
 # Broker restart fixed this partial in-memory state and we were able to resume 
processing for transactions.

  was:
We observed a case where the transaction coordinator could not load transaction 
state from __transaction-state topic partition. Clients would continue seeing 
COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker hosting the 
coordinator is restarted.

This is the sequence of events that leads to the issue:
 # The broker is the leader of one (or more) transaction state topic partitions.
 # The broker loses its ZK session due to a network issue.
 # Broker reestablishes session with ZK, though there are still transient 
network issues.
 # Broker is made follower of the transaction state topic partition it was 
leading earlier.
 # During the become-follower transition, the broker loses its ZK session again.
 # The become-follower transition for this broker fails in-between, leaving us 
in a partial leader / partial follower state for the transaction topic. This 
meant that we could not unload the transaction metadata. However, the broker 
successfully caches the leader epoch of associated with the LeaderAndIsrRequest.
 # Later, when the ZK session is finally established successfully, the broker 
ignores the become-follower transition as the leader epoch was same as the one 
it had cached. This prevented the transaction metadata from being unloaded.
 # Because this partition was a partial follower, we had setup replica 
fetchers. The partition continued to fetch from the leader until it was made 
part of the ISR.
 # Once it was part of the ISR, preferred leader election kicked in and elected 
this broker as the leader.
 # When processing the become-leader transition, the operation failed as we 
already had transaction metadata loaded at a previous epoch.
 # This meant that this partition was left in the "loading" state and we thus 
returned COORDINATOR_LOAD_IN_PROGRESS errors.
 # Broker restart fixed this partial in-memory state and we were able to resume 
processing for transactions.


> Transaction coordinator could be left in unknown state after ZK session 
> timeout
> ---
>
> Key: KAFKA-9307
> URL: https://issues.apache.org/jira/browse/KAFKA-9307
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Major
>
> We observed a case where the transaction coordinator could not load 
> transaction state from __transaction-state topic partition. Clients would 
> continue seeing COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker 
> hosting the coordinator is restarted.
> This is the sequence of events that leads to the issue:
>  # The broker is the leader of one (or more) transaction state topic 
> partitions.
>  # The broker loses its ZK session due to a 

[jira] [Commented] (KAFKA-9307) Transaction coordinator could be left in unknown state after ZK session timeout

2019-12-16 Thread Dhruvil Shah (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997883#comment-16997883
 ] 

Dhruvil Shah commented on KAFKA-9307:
-

Exception during step 10 that caused transaction metadata load to fail after 
become-leader transition:
{code:java}
2019-12-12 03:11:56,320] ERROR [Transaction State Manager 3]: The metadata 
cache for txn partition 41 has already exist with epoch 111 and 6 entries while 
trying to add to it; this should not happen 
(kafka.coordinator.transaction.TransactionStateManager)
[2019-12-12 03:11:56,320] ERROR Uncaught exception in scheduled task 
'load-txns-for-partition-__transaction_state-41' (kafka.utils.KafkaScheduler)
java.lang.IllegalStateException: The metadata cache for txn partition 41 has 
already exist with epoch 111 and 6 entries while trying to add to it; this 
should not happen
at 
kafka.coordinator.transaction.TransactionStateManager.addLoadedTransactionsToCache(TransactionStateManager.scala:369)
at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply$mcV$sp(TransactionStateManager.scala:394)
at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
at kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259)
at 
kafka.coordinator.transaction.TransactionStateManager.kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1(TransactionStateManager.scala:392)
at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$loadTransactionsForTxnTopicPartition$2.apply$mcV$sp(TransactionStateManager.scala:426)
at 
kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:114)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java)
at java.lang.Thread.run(Thread.java:748)
{code}

> Transaction coordinator could be left in unknown state after ZK session 
> timeout
> ---
>
> Key: KAFKA-9307
> URL: https://issues.apache.org/jira/browse/KAFKA-9307
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Major
>
> We observed a case where the transaction coordinator could not load 
> transaction state from __transaction-state topic partition. Clients would 
> continue seeing COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker 
> hosting the coordinator is restarted.
> This is the sequence of events that leads to the issue:
>  # The broker is the leader of one (or more) transaction state topic 
> partitions.
>  # The broker loses its ZK session due to a network issue.
>  # Broker reestablishes session with ZK, though there are still transient 
> network issues.
>  # Broker is made follower of the transaction state topic partition it was 
> leading earlier.
>  # During the become-follower transition, the broker loses its ZK session 
> again.
>  # The become-follower transition for this broker fails in-between, leaving 
> us in a partial leader / partial follower state for the transaction topic. 
> This meant that we could not unload the transaction metadata. However, the 
> broker successfully caches the leader epoch of associated with the 
> LeaderAndIsrRequest.
>  # Later, when the ZK session is finally established successfully, the broker 
> ignores the become-follower transition as the leader epoch was same as the 
> one it had cached. This prevented the transaction metadata from being 
> unloaded.
>  # Because this partition was a partial follower, we had setup replica 
> fetchers. The partition continued to fetch from the leader until it was made 
> part of the ISR.
>  # Once it was part of the ISR, preferred leader election kicked in and 
> elected this broker as the leader.
>  # When processing the 

[jira] [Commented] (KAFKA-9307) Transaction coordinator could be left in unknown state after ZK session timeout

2019-12-16 Thread Dhruvil Shah (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997882#comment-16997882
 ] 

Dhruvil Shah commented on KAFKA-9307:
-

Exception during step 6 that led to partial completion of become-follower 
transition:
{code:java}
 2019-12-12 03:11:56,320] ERROR [Transaction State Manager 3]: The metadata 
cache for txn partition 41 has already exist with epoch 111 and 6 entries while 
trying to add to it; this should not happen 
(kafka.coordinator.transaction.TransactionStateManager) [2019-12-12 
03:11:56,320] ERROR Uncaught exception in scheduled task 
'load-txns-for-partition-__transaction_state-41' (kafka.utils.KafkaScheduler) 
java.lang.IllegalStateException: The metadata cache for txn partition 41 has 
already exist with epoch 111 and 6 entries while trying to add to it; this 
should not happen at 
kafka.coordinator.transaction.TransactionStateManager.addLoadedTransactionsToCache(TransactionStateManager.scala:369)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply$mcV$sp(TransactionStateManager.scala:394)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) at 
kafka.coordinator.transaction.TransactionStateManager.kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1(TransactionStateManager.scala:392)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$loadTransactionsForTxnTopicPartition$2.apply$mcV$sp(TransactionStateManager.scala:426)
 at 
kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:114) at 
kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java) 
at java.lang.Thread.run(Thread.java:748)
{code}

> Transaction coordinator could be left in unknown state after ZK session 
> timeout
> ---
>
> Key: KAFKA-9307
> URL: https://issues.apache.org/jira/browse/KAFKA-9307
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Major
>
> We observed a case where the transaction coordinator could not load 
> transaction state from __transaction-state topic partition. Clients would 
> continue seeing COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker 
> hosting the coordinator is restarted.
> This is the sequence of events that leads to the issue:
>  # The broker is the leader of one (or more) transaction state topic 
> partitions.
>  # The broker loses its ZK session due to a network issue.
>  # Broker reestablishes session with ZK, though there are still transient 
> network issues.
>  # Broker is made follower of the transaction state topic partition it was 
> leading earlier.
>  # During the become-follower transition, the broker loses its ZK session 
> again.
>  # The become-follower transition for this broker fails in-between, leaving 
> us in a partial leader / partial follower state for the transaction topic. 
> This meant that we could not unload the transaction metadata. However, the 
> broker successfully caches the leader epoch of associated with the 
> LeaderAndIsrRequest.
>  # Later, when the ZK session is finally established successfully, the broker 
> ignores the become-follower transition as the leader epoch was same as the 
> one it had cached. This prevented the transaction metadata from being 
> unloaded.
>  # Because this partition was a partial follower, we had setup replica 
> fetchers. The partition continued to fetch from the leader until it was made 
> part of the ISR.
>  # Once it was part of the ISR, preferred leader election kicked in and 
> elected this broker as the leader.
>  # When processing the become-leader transition, the operation failed as we 
> already had transaction metadata loaded at a previous epoch.
>  # This meant that 

[jira] [Updated] (KAFKA-9307) Transaction coordinator could be left in unknown state after ZK session timeout

2019-12-16 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-9307:

Description: 
We observed a case where the transaction coordinator could not load transaction 
state from __transaction-state topic partition. Clients would continue seeing 
COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker hosting the 
coordinator is restarted.

This is the sequence of events that leads to the issue:
 # The broker is the leader of one (or more) transaction state topic partitions.
 # The broker loses its ZK session due to a network issue.
 # Broker reestablishes session with ZK, though there are still transient 
network issues.
 # Broker is made follower of the transaction state topic partition it was 
leading earlier.
 # During the become-follower transition, the broker loses its ZK session again.
 # The become-follower transition for this broker fails in-between, leaving us 
in a partial leader / partial follower state for the transaction topic. This 
meant that we could not unload the transaction metadata. However, the broker 
successfully caches the leader epoch of associated with the LeaderAndIsrRequest.
 # Later, when the ZK session is finally established successfully, the broker 
ignores the become-follower transition as the leader epoch was same as the one 
it had cached. This prevented the transaction metadata from being unloaded.
 # Because this partition was a partial follower, we had setup replica 
fetchers. The partition continued to fetch from the leader until it was made 
part of the ISR.
 # Once it was part of the ISR, preferred leader election kicked in and elected 
this broker as the leader.
 # When processing the become-leader transition, the operation failed as we 
already had transaction metadata loaded at a previous epoch.
 # This meant that this partition was left in the "loading" state and we thus 
returned COORDINATOR_LOAD_IN_PROGRESS errors.
 # Broker restart fixed this partial in-memory state and we were able to resume 
processing for transactions.

  was:
We observed a case where the transaction coordinator could not load transaction 
state from __transaction-state topic partition. Clients would continue seeing 
COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker hosting the 
coordinator is restarted.

This is the sequence of events that leads to the issue:
 * The broker is the leader of one (or more) transaction state topic partitions.
 * The broker loses its ZK session due to a network issue.

 * Broker reestablishes session with ZK, though there are still transient 
network issues.
 * Broker is made follower of the transaction state topic partition it was 
leading earlier.

 * During the become-follower transition, the broker loses its ZK session again.
 * The become-follower transition for this broker fails in-between, leaving us 
in a partial leader / partial follower state for the transaction topic. This 
meant that we could not unload the transaction metadata. However, the broker 
successfully caches the leader epoch of associated with the LeaderAndIsrRequest.

 
 * Later, when the ZK session is finally established successfully, the broker 
ignores the become-follower transition as the leader epoch was same as the one 
it had cached. This prevented the transaction metadata from being unloaded.

 * Because this partition was a partial follower, we had setup replica 
fetchers. The partition continued to fetch from the leader until it was made 
part of the ISR.

 * Once it was part of the ISR, preferred leader election kicked in and elected 
this broker as the leader.

 * When processing the become-leader transition, the operation failed as we 
already had transaction metadata loaded at a previous epoch.

```
 2019-12-12 03:11:56,320] ERROR [Transaction State Manager 3]: The metadata 
cache for txn partition 41 has already exist with epoch 111 and 6 entries while 
trying to add to it; this should not happen 
(kafka.coordinator.transaction.TransactionStateManager) [2019-12-12 
03:11:56,320] ERROR Uncaught exception in scheduled task 
'load-txns-for-partition-__transaction_state-41' (kafka.utils.KafkaScheduler) 
java.lang.IllegalStateException: The metadata cache for txn partition 41 has 
already exist with epoch 111 and 6 entries while trying to add to it; this 
should not happen at 
kafka.coordinator.transaction.TransactionStateManager.addLoadedTransactionsToCache(TransactionStateManager.scala:369)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply$mcV$sp(TransactionStateManager.scala:394)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
 at 

[jira] [Updated] (KAFKA-9307) Transaction coordinator could be left in unknown state after ZK session timeout

2019-12-16 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-9307:

Description: 
We observed a case where the transaction coordinator could not load transaction 
state from __transaction-state topic partition. Clients would continue seeing 
COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker hosting the 
coordinator is restarted.

This is the sequence of events that leads to the issue:
 * The broker is the leader of one (or more) transaction state topic partitions.
 * The broker loses its ZK session due to a network issue.

 * Broker reestablishes session with ZK, though there are still transient 
network issues.
 * Broker is made follower of the transaction state topic partition it was 
leading earlier.

 * During the become-follower transition, the broker loses its ZK session again.
 * The become-follower transition for this broker fails in-between, leaving us 
in a partial leader / partial follower state for the transaction topic. This 
meant that we could not unload the transaction metadata. However, the broker 
successfully caches the leader epoch of associated with the LeaderAndIsrRequest.

 
 * Later, when the ZK session is finally established successfully, the broker 
ignores the become-follower transition as the leader epoch was same as the one 
it had cached. This prevented the transaction metadata from being unloaded.

 * Because this partition was a partial follower, we had setup replica 
fetchers. The partition continued to fetch from the leader until it was made 
part of the ISR.

 * Once it was part of the ISR, preferred leader election kicked in and elected 
this broker as the leader.

 * When processing the become-leader transition, the operation failed as we 
already had transaction metadata loaded at a previous epoch.

```
 2019-12-12 03:11:56,320] ERROR [Transaction State Manager 3]: The metadata 
cache for txn partition 41 has already exist with epoch 111 and 6 entries while 
trying to add to it; this should not happen 
(kafka.coordinator.transaction.TransactionStateManager) [2019-12-12 
03:11:56,320] ERROR Uncaught exception in scheduled task 
'load-txns-for-partition-__transaction_state-41' (kafka.utils.KafkaScheduler) 
java.lang.IllegalStateException: The metadata cache for txn partition 41 has 
already exist with epoch 111 and 6 entries while trying to add to it; this 
should not happen at 
kafka.coordinator.transaction.TransactionStateManager.addLoadedTransactionsToCache(TransactionStateManager.scala:369)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply$mcV$sp(TransactionStateManager.scala:394)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1$1.apply(TransactionStateManager.scala:393)
 at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251) at 
kafka.utils.CoreUtils$.inWriteLock(CoreUtils.scala:259) at 
kafka.coordinator.transaction.TransactionStateManager.kafka$coordinator$transaction$TransactionStateManager$$loadTransactions$1(TransactionStateManager.scala:392)
 at 
kafka.coordinator.transaction.TransactionStateManager$$anonfun$loadTransactionsForTxnTopicPartition$2.apply$mcV$sp(TransactionStateManager.scala:426)
 at 
kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:114) at 
kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:63) at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java) 
at java.lang.Thread.run(Thread.java:748)
 ```
 * This meant that this partition was left in the "loading" state and we thus 
returned COORDINATOR_LOAD_IN_PROGRESS errors.

 * Broker restart fixed this partial in-memory state and we were able to resume 
processing for transactions.

  was:
We observed a case where the transaction coordinator could not load transaction 
state from __transaction-state topic partition. Clients would continue seeing 
COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker hosting the 
coordinator is restarted.

This is the sequence of events that leads to the issue:
 * The broker is the leader of one (or more) transaction state topic partitions.
 * The broker loses 

[jira] [Updated] (KAFKA-9307) Transaction coordinator could be left in unknown state after ZK session timeout

2019-12-16 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah updated KAFKA-9307:

Description: 
We observed a case where the transaction coordinator could not load transaction 
state from __transaction-state topic partition. Clients would continue seeing 
COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker hosting the 
coordinator is restarted.

This is the sequence of events that leads to the issue:
 * The broker is the leader of one (or more) transaction state topic partitions.
 * The broker loses its ZK session due to a network issue.

 * Broker reestablishes session with ZK, though there are still transient 
network issues.
 * Broker is made follower of the transaction state topic partition it was 
leading earlier.

 * During the become-follower transition, the broker loses its ZK session again.
 * The become-follower transition for this broker fails in-between, leaving us 
in a partial leader / partial follower state for the transaction topic. This 
meant that we could not unload the transaction metadata. However, the broker 
successfully caches the leader epoch of associated with the LeaderAndIsrRequest.

```
 [2019-12-12 03:08:17,864] ERROR [KafkaApi-3] Error when handling request: 
clientId=2, correlationId=1, api=LEADER_AND_ISR, ... 
{topic=__transaction_state,partition_states=[{...

{partition=41,controller_epoch=16,leader=4,leader_epoch=112,isr=[2,4,1],zk_version=208,replicas=[3,4,2,1],is_new=false}

... org.apache.zookeeper.KeeperException$SessionExpiredException: 
KeeperErrorCode = Session expired for /brokers/topics/__transaction_state at 
org.apache.zookeeper.KeeperException.create(KeeperException.java:130) at 
org.apache.zookeeper.KeeperException.create(KeeperException.java:54) at 
kafka.zookeeper.AsyncResponse.resultException(ZooKeeperClient.scala:537) at 
kafka.zk.KafkaZkClient$$anonfun$getReplicaAssignmentForTopics$1.apply(KafkaZkClient.scala:579)
 at 
kafka.zk.KafkaZkClient$$anonfun$getReplicaAssignmentForTopics$1.apply(KafkaZkClient.scala:574)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at 
scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at 
kafka.zk.KafkaZkClient.getReplicaAssignmentForTopics(KafkaZkClient.scala:574) 
at kafka.zk.KafkaZkClient.getTopicPartitionCount(KafkaZkClient.scala:624) at 
kafka.coordinator.transaction.TransactionStateManager.getTransactionTopicPartitionCount(TransactionStateManager.scala:279)
 at 
kafka.coordinator.transaction.TransactionStateManager.validateTransactionTopicPartitionCountIsStable(TransactionStateManager.scala:465)
 at 
kafka.coordinator.transaction.TransactionStateManager.removeTransactionsForTxnTopicPartition(TransactionStateManager.scala:434)
 at 
kafka.coordinator.transaction.TransactionCoordinator.handleTxnEmigration(TransactionCoordinator.scala:282)
 at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:190)
 at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:186)
 at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$onLeadershipChange$1(KafkaApis.scala:186)
 at kafka.server.KafkaApis$$anonfun$2.apply(KafkaApis.scala:202) at 
kafka.server.KafkaApis$$anonfun$2.apply(KafkaApis.scala:202) at 
kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1153) 
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:202) 
 ```
 * Later, when the ZK session is finally established successfully, the broker 
ignores the become-follower transition as the leader epoch was same as the one 
it had cached. This prevented the transaction metadata from being unloaded.

 * Because this partition was a partial follower, we had setup replica 
fetchers. The partition continued to fetch from the leader until it was made 
part of the ISR.

 * Once it was part of the ISR, preferred leader election kicked in and elected 
this broker as the leader.

 * When processing the become-leader transition, the operation failed as we 
already had transaction metadata loaded at a previous epoch.

```
2019-12-12 03:11:56,320] ERROR [Transaction State Manager 3]: The metadata 
cache for txn partition 41 has already exist with epoch 111 and 6 entries while 
trying to add to it; this should not happen 
(kafka.coordinator.transaction.TransactionStateManager) [2019-12-12 
03:11:56,320] ERROR Uncaught exception in scheduled task 
'load-txns-for-partition-__transaction_state-41' 

[jira] [Assigned] (KAFKA-9307) Transaction coordinator could be left in unknown state after ZK session timeout

2019-12-16 Thread Dhruvil Shah (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah reassigned KAFKA-9307:
---

Assignee: Dhruvil Shah

> Transaction coordinator could be left in unknown state after ZK session 
> timeout
> ---
>
> Key: KAFKA-9307
> URL: https://issues.apache.org/jira/browse/KAFKA-9307
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Major
>
> We observed a case where the transaction coordinator could not load 
> transaction state from __transaction-state topic partition. Clients would 
> continue seeing COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker 
> hosting the coordinator is restarted.
> This is the sequence of events that leads to the issue:
>  * The broker is the leader of one (or more) transaction state topic 
> partitions.
>  * The broker loses its ZK session due to a network issue.
>  * Broker reestablishes session with ZK, though there are still transient 
> network issues.
>  * Broker is made follower of the transaction state topic partition it was 
> leading earlier.
>  * During the become-follower transition, the broker loses its ZK session 
> again.
>  * The become-follower transition for this broker fails in-between, leaving 
> us in a partial leader / partial follower state for the transaction topic. 
> This meant that we could not unload the transaction metadata. However, the 
> broker successfully caches the leader epoch of associated with the 
> LeaderAndIsrRequest.
> ```
> [2019-12-12 03:08:17,864] ERROR [KafkaApi-3] Error when handling request: 
> clientId=2, correlationId=1, api=LEADER_AND_ISR, ... 
> \{topic=__transaction_state,partition_states=[{... 
> {partition=41,controller_epoch=16,leader=4,leader_epoch=112,isr=[2,4,1],zk_version=208,replicas=[3,4,2,1],is_new=false}
>  ... org.apache.zookeeper.KeeperException$SessionExpiredException: 
> KeeperErrorCode = Session expired for /brokers/topics/__transaction_state at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:130) at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:54) at 
> kafka.zookeeper.AsyncResponse.resultException(ZooKeeperClient.scala:537) at 
> kafka.zk.KafkaZkClient$$anonfun$getReplicaAssignmentForTopics$1.apply(KafkaZkClient.scala:579)
>  at 
> kafka.zk.KafkaZkClient$$anonfun$getReplicaAssignmentForTopics$1.apply(KafkaZkClient.scala:574)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at 
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at 
> kafka.zk.KafkaZkClient.getReplicaAssignmentForTopics(KafkaZkClient.scala:574) 
> at kafka.zk.KafkaZkClient.getTopicPartitionCount(KafkaZkClient.scala:624) at 
> kafka.coordinator.transaction.TransactionStateManager.getTransactionTopicPartitionCount(TransactionStateManager.scala:279)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.validateTransactionTopicPartitionCountIsStable(TransactionStateManager.scala:465)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.removeTransactionsForTxnTopicPartition(TransactionStateManager.scala:434)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.handleTxnEmigration(TransactionCoordinator.scala:282)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:190)
>  at 
> kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:186)
>  at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) at 
> kafka.server.KafkaApis.kafka$server$KafkaApis$$onLeadershipChange$1(KafkaApis.scala:186)
>  at kafka.server.KafkaApis$$anonfun$2.apply(KafkaApis.scala:202) at 
> kafka.server.KafkaApis$$anonfun$2.apply(KafkaApis.scala:202) at 
> kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1153) 
> at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:202) 
> ```
>  * Later, when the ZK session is finally established successfully, the broker 
> ignores the become-follower transition as the leader epoch was same as the 
> one it had cached. This prevented the transaction metadata from being 
> unloaded.
>  * Because this partition was a partial follower, we had setup replica 
> fetchers. The partition continued to fetch from the leader until it was made 
> part of the ISR.
>  * Once it was part of the ISR, 

[jira] [Created] (KAFKA-9307) Transaction coordinator could be left in unknown state after ZK session timeout

2019-12-16 Thread Dhruvil Shah (Jira)
Dhruvil Shah created KAFKA-9307:
---

 Summary: Transaction coordinator could be left in unknown state 
after ZK session timeout
 Key: KAFKA-9307
 URL: https://issues.apache.org/jira/browse/KAFKA-9307
 Project: Kafka
  Issue Type: Bug
  Components: core
Reporter: Dhruvil Shah


We observed a case where the transaction coordinator could not load transaction 
state from __transaction-state topic partition. Clients would continue seeing 
COORDINATOR_LOAD_IN_PROGRESS exceptions until the broker hosting the 
coordinator is restarted.

This is the sequence of events that leads to the issue:
 * The broker is the leader of one (or more) transaction state topic partitions.
 * The broker loses its ZK session due to a network issue.

 * Broker reestablishes session with ZK, though there are still transient 
network issues.
 * Broker is made follower of the transaction state topic partition it was 
leading earlier.

 * During the become-follower transition, the broker loses its ZK session again.
 * The become-follower transition for this broker fails in-between, leaving us 
in a partial leader / partial follower state for the transaction topic. This 
meant that we could not unload the transaction metadata. However, the broker 
successfully caches the leader epoch of associated with the LeaderAndIsrRequest.

```
[2019-12-12 03:08:17,864] ERROR [KafkaApi-3] Error when handling request: 
clientId=2, correlationId=1, api=LEADER_AND_ISR, ... 
\{topic=__transaction_state,partition_states=[{... 
{partition=41,controller_epoch=16,leader=4,leader_epoch=112,isr=[2,4,1],zk_version=208,replicas=[3,4,2,1],is_new=false}
 ... org.apache.zookeeper.KeeperException$SessionExpiredException: 
KeeperErrorCode = Session expired for /brokers/topics/__transaction_state at 
org.apache.zookeeper.KeeperException.create(KeeperException.java:130) at 
org.apache.zookeeper.KeeperException.create(KeeperException.java:54) at 
kafka.zookeeper.AsyncResponse.resultException(ZooKeeperClient.scala:537) at 
kafka.zk.KafkaZkClient$$anonfun$getReplicaAssignmentForTopics$1.apply(KafkaZkClient.scala:579)
 at 
kafka.zk.KafkaZkClient$$anonfun$getReplicaAssignmentForTopics$1.apply(KafkaZkClient.scala:574)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at 
scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at 
kafka.zk.KafkaZkClient.getReplicaAssignmentForTopics(KafkaZkClient.scala:574) 
at kafka.zk.KafkaZkClient.getTopicPartitionCount(KafkaZkClient.scala:624) at 
kafka.coordinator.transaction.TransactionStateManager.getTransactionTopicPartitionCount(TransactionStateManager.scala:279)
 at 
kafka.coordinator.transaction.TransactionStateManager.validateTransactionTopicPartitionCountIsStable(TransactionStateManager.scala:465)
 at 
kafka.coordinator.transaction.TransactionStateManager.removeTransactionsForTxnTopicPartition(TransactionStateManager.scala:434)
 at 
kafka.coordinator.transaction.TransactionCoordinator.handleTxnEmigration(TransactionCoordinator.scala:282)
 at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:190)
 at 
kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:186)
 at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) at 
kafka.server.KafkaApis.kafka$server$KafkaApis$$onLeadershipChange$1(KafkaApis.scala:186)
 at kafka.server.KafkaApis$$anonfun$2.apply(KafkaApis.scala:202) at 
kafka.server.KafkaApis$$anonfun$2.apply(KafkaApis.scala:202) at 
kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1153) 
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:202) 
```

 * Later, when the ZK session is finally established successfully, the broker 
ignores the become-follower transition as the leader epoch was same as the one 
it had cached. This prevented the transaction metadata from being unloaded.

 * Because this partition was a partial follower, we had setup replica 
fetchers. The partition continued to fetch from the leader until it was made 
part of the ISR.

 * Once it was part of the ISR, preferred leader election kicked in and elected 
this broker as the leader.

 * When processing the become-leader transition, the operation failed as we 
already had transaction metadata loaded at a previous epoch. This meant that 
this partition was left in the "loading" state and we thus returned 
COORDINATOR_LOAD_IN_PROGRESS errors.

 * Broker restart fixed this partial in-memory state and we were able to resume 
processing for 

[jira] [Commented] (KAFKA-9243) Update the javadocs from KeyValueStore to TimestampKeyValueStore

2019-12-16 Thread Demitri Swan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997865#comment-16997865
 ] 

Demitri Swan commented on KAFKA-9243:
-

Thanks for adding me as a contributor. Looking forward to helping out.

Would someone please provide an example and more clarification? Of the regions 
of code where I see `TimestampedKeyValueStore` referenced in the method 
signatures I do not see incorrect references to `KeyValueStores` in the 
Javadocs. Also, the [current KTables 
JavaDoc|https://kafka.apache.org/24/javadoc/org/apache/kafka/streams/kstream/KTable.html]
 does reference KeyValueStores within the Javadocs, but they're also referenced 
in the method signatures. 

_Caveat: I'm new to the codebase_

> Update the javadocs from KeyValueStore to TimestampKeyValueStore
> 
>
> Key: KAFKA-9243
> URL: https://issues.apache.org/jira/browse/KAFKA-9243
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Walker Carlson
>Assignee: Demitri Swan
>Priority: Minor
>  Labels: beginner, newbie
>
> As of version 2.3, the DSL uses `TimestampedStores` to represent KTables. 
> However, the JavaDocs of all table-related operators still refer to plain 
> `KeyValueStores` etc instead of `TimestampedKeyValueStore` etc. Hence, all 
> those JavaDocs should be updated (the JavaDocs are technically not incorrect, 
> because one can access a TimestampedKeyValueStore as a KeyValueStore, too – 
> hence this ticket is not a "bug" but an improvement.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9148) Consider forking RocksDB for Streams

2019-12-16 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-9148:
---
Description: 
We recently upgraded our RocksDB dependency to 5.18 for its memory-management 
abilities (namely the WriteBufferManager, see KAFKA-8215). Unfortunately, 
someone from Flink recently discovered a ~8% [performance 
regression|https://github.com/facebook/rocksdb/issues/5774] that exists in all 
versions 5.18+ (up through the current newest version, 6.2.2). Flink was able 
to react to this by downgrading to 5.17 and [picking the 
WriteBufferManage|https://github.com/dataArtisans/frocksdb/pull/4]r to their 
fork (fRocksDB).

Due to this and other reasons enumerated below, we should consider also forking 
our own RocksDB for Streams.

Pros:
 * We can avoid passing sudden breaking changes on to our users, such removal 
of methods with no deprecation period (see discussion on KAFKA-8897)
 * We can pick whichever version has the best performance for our needs, and 
pick over any new features, metrics, etc that we need to use rather than being 
forced to upgrade (and breaking user code, introducing regression, etc)
 * Support for some architectures does not exist in all RocksDB versions, 
making Streams completely unusable for some users until we can upgrade the 
rocksdb dependency to one that supports their specific case. It's worth noting 
that we've only had [one user|https://issues.apache.org/jira/browse/KAFKA-9225] 
hit this so far (that we know of), and some workarounds have been discussed on 
the ticket.
 * The Java API seems to be a very low priority to the rocksdb folks.
 ** They leave out critical functionality, features, and configuration options 
that have been in the c++ API for a very long time
 ** Those that do make it over often have random gaps in the API such as 
setters but no getters (see [rocksdb PR 
#5186|https://github.com/facebook/rocksdb/pull/5186])
 ** Others are poorly designed and require too many trips across the JNI, 
making otherwise incredibly useful features prohibitively expensive.
 *** [|#issuecomment-83145980] [Custom 
Comparator|https://github.com/facebook/rocksdb/issues/538#issuecomment-83145980]:
 a custom comparator could significantly improve the performance of session 
windows. This is trivial to do but given the high performance cost of crossing 
the jni, it is currently only practical to use a c++ comparator
 *** [Prefix Seek|https://github.com/facebook/rocksdb/issues/6004]: not 
currently used by Streams but a commonly requested feature, and may also allow 
improved range queries
 ** Even when an external contributor develops a solution for poorly performing 
Java functionality and helpfully tries to contribute their patch back to 
rocksdb, it gets ignored by the rocksdb people ([rocksdb PR 
#2283|https://github.com/facebook/rocksdb/pull/2283])

Cons:
 * More work (not to be trivialized, the truth is we don't and can't know how 
much extra work this will ultimately be)

Given that we rarely upgrade the Rocks dependency, use only some fraction of 
its features, and would need or want to make only minimal changes ourselves, it 
seems like we could actually get away with very little extra work by forking 
rocksdb. Note that as of this writing the frocksdb repo has only needed to open 
5 PRs on top of the actual rocksdb (two of them trivial). Of course, the LOE to 
maintain this will only grow over time, so we should think carefully about 
whether and when to start taking on this potential burden.

 

  was:
We recently upgraded our RocksDB dependency to 5.18 for its memory-management 
abilities (namely the WriteBufferManager, see KAFKA-8215). Unfortunately, 
someone from Flink recently discovered a ~8% [performance 
regression|https://github.com/facebook/rocksdb/issues/5774] that exists in all 
versions 5.18+ (up through the current newest version, 6.2.2). Flink was able 
to react to this by downgrading to 5.17 and [picking the 
WriteBufferManage|https://github.com/dataArtisans/frocksdb/pull/4]r to their 
fork (fRocksDB).

Due to this and other reasons enumerated below, we should consider also forking 
our own RocksDB for Streams.

Pros:
 * We can avoid passing sudden breaking changes on to our users, such removal 
of methods with no deprecation period (see discussion on KAFKA-8897)
 * We can pick whichever version has the best performance for our needs, and 
pick over any new features, metrics, etc that we need to use rather than being 
forced to upgrade (and breaking user code, introducing regression, etc)
 * Support for some architectures does not exist in all RocksDB versions, 
making Streams completely unusable for some users until we can upgrade the 
rocksdb dependency to one that supports their specific case
 * The Java API seems to be a very low priority to the rocksdb folks.
 ** They leave out critical functionality, features, and 

[jira] [Commented] (KAFKA-9148) Consider forking RocksDB for Streams

2019-12-16 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997804#comment-16997804
 ] 

Sophie Blee-Goldman commented on KAFKA-9148:


Thanks [~adamretter]! It helps a lot to have some insight into how the two 
projects differ in terms of priorities, process, definitions of 
compatibility/Major version changes, etc. For example given that AK seems to 
have stricter guidelines around breaking changes, it probably is to our 
advantage if the Java API makes some tradeoffs for greater stability (even if 
that's just a side effect of lagging the C++ API). 

And I should probably clarify that the intention of this ticket was more to 
keep track of these issues that have come up so they don't get lost to the 
ether, than to push strongly for forking rocksdb at this (or any) time.

The custom comparator in particular would go a long way for us. Is there an 
open ticket/issue that we could use to track progress on this? I just saw the 
new benchmark results for [PR 
#2283|https://github.com/facebook/rocksdb/pull/2283] and they look very 
promising! I also know one of ours users who is interested in adding support 
for prefix seeking to Streams has recently been looking into running benchmarks 
on this feature. It'll be interesting to see how those results pan out as well

> Consider forking RocksDB for Streams 
> -
>
> Key: KAFKA-9148
> URL: https://issues.apache.org/jira/browse/KAFKA-9148
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> We recently upgraded our RocksDB dependency to 5.18 for its memory-management 
> abilities (namely the WriteBufferManager, see KAFKA-8215). Unfortunately, 
> someone from Flink recently discovered a ~8% [performance 
> regression|https://github.com/facebook/rocksdb/issues/5774] that exists in 
> all versions 5.18+ (up through the current newest version, 6.2.2). Flink was 
> able to react to this by downgrading to 5.17 and [picking the 
> WriteBufferManage|https://github.com/dataArtisans/frocksdb/pull/4]r to their 
> fork (fRocksDB).
> Due to this and other reasons enumerated below, we should consider also 
> forking our own RocksDB for Streams.
> Pros:
>  * We can avoid passing sudden breaking changes on to our users, such removal 
> of methods with no deprecation period (see discussion on KAFKA-8897)
>  * We can pick whichever version has the best performance for our needs, and 
> pick over any new features, metrics, etc that we need to use rather than 
> being forced to upgrade (and breaking user code, introducing regression, etc)
>  * Support for some architectures does not exist in all RocksDB versions, 
> making Streams completely unusable for some users until we can upgrade the 
> rocksdb dependency to one that supports their specific case
>  * The Java API seems to be a very low priority to the rocksdb folks.
>  ** They leave out critical functionality, features, and configuration 
> options that have been in the c++ API for a very long time
>  ** Those that do make it over often have random gaps in the API such as 
> setters but no getters (see [rocksdb PR 
> #5186|https://github.com/facebook/rocksdb/pull/5186])
>  ** Others are poorly designed and require too many trips across the JNI, 
> making otherwise incredibly useful features prohibitively expensive.
>  *** [Custom comparator|#issuecomment-83145980]]: a custom comparator could 
> significantly improve the performance of session windows. This is trivial to 
> do but given the high performance cost of crossing the jni, it is currently 
> only practical to use a c++ comparator
>  *** [Prefix Seek|https://github.com/facebook/rocksdb/issues/6004]: not 
> currently used by Streams but a commonly requested feature, and may also 
> allow improved range queries
>  ** Even when an external contributor develops a solution for poorly 
> performing Java functionality and helpfully tries to contribute their patch 
> back to rocksdb, it gets ignored by the rocksdb people ([rocksdb PR 
> #2283|https://github.com/facebook/rocksdb/pull/2283])
> Cons:
>  * More work (not to be trivialized, the truth is we don't and can't know how 
> much extra work this will ultimately be)
> Given that we rarely upgrade the Rocks dependency, use only some fraction of 
> its features, and would need or want to make only minimal changes ourselves, 
> it seems like we could actually get away with very little extra work by 
> forking rocksdb. Note that as of this writing the frocksdb repo has only 
> needed to open 5 PRs on top of the actual rocksdb (two of them trivial). Of 
> course, the LOE to maintain this will only grow over time, so we should think 
> carefully about whether and when to start taking on this potential burden.
>  



--
This message 

[jira] [Commented] (KAFKA-9304) Image on Kafka docs shows incorrect message ID segments

2019-12-16 Thread Victoria Bialas (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997784#comment-16997784
 ] 

Victoria Bialas commented on KAFKA-9304:


[~mjsax] it does make sense, I can see that the lower expanded box on the right 
was probably copied from the top right box and not modified. 

But I have to say, regarding the top box, I understand where they are getting 
the first and last (top and bottom) message numbers: 34477849968 and 
3551592051, respectively, are the first and last offsets in the first row under 
"Active Segment List". However, where are they getting the other offset numbers 
in that expanded view? e.g., 34477850175? Where does that come from ? It isn't 
sequential after 34477849968.

> Image on Kafka docs shows incorrect message ID segments
> ---
>
> Key: KAFKA-9304
> URL: https://issues.apache.org/jira/browse/KAFKA-9304
> Project: Kafka
>  Issue Type: Bug
>Reporter: Victoria Bialas
>Assignee: Victoria Bialas
>Priority: Minor
>
>  
> Docs page: [https://kafka.apache.org/documentation/#log]
> Link to Tweet: [https://twitter.com/Preety48408391/status/1205764249995202560]
> Hi Kafka team, looks like there is issue with below depicting image on Kafka 
> documentation section 5.4. In 2nd segment 82xx.kafka, Message IDs are 
> incorrectly mentioned. Message should start from 82xx but starting from 34xx 
> like in 1st segment. Please correct.
> [~guozhang] , [~mjsax] , this came through on a Tweet, I'll try to fix on the 
> docs. May need some guidance from you, as the problem description isn't 
> completely clear to me.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9304) Image on Kafka docs shows incorrect message ID segments

2019-12-16 Thread Victoria Bialas (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997784#comment-16997784
 ] 

Victoria Bialas edited comment on KAFKA-9304 at 12/17/19 1:56 AM:
--

[~mjsax] it does make sense, I can see that the lower expanded box on the right 
was probably copied from the top right box and not modified.

But I have to say, regarding the top box, I do understand where they are 
getting the first and last (top and bottom) message numbers: 34477849968 and 
3551592051, respectively, are the first and last offsets in the first row under 
"Active Segment List". However, where are they getting the other offset numbers 
in that expanded view? e.g., 34477850175? Where does that come from ? It isn't 
sequential after 34477849968.


was (Author: orangesnap):
[~mjsax] it does make sense, I can see that the lower expanded box on the right 
was probably copied from the top right box and not modified. 

But I have to say, regarding the top box, I understand where they are getting 
the first and last (top and bottom) message numbers: 34477849968 and 
3551592051, respectively, are the first and last offsets in the first row under 
"Active Segment List". However, where are they getting the other offset numbers 
in that expanded view? e.g., 34477850175? Where does that come from ? It isn't 
sequential after 34477849968.

> Image on Kafka docs shows incorrect message ID segments
> ---
>
> Key: KAFKA-9304
> URL: https://issues.apache.org/jira/browse/KAFKA-9304
> Project: Kafka
>  Issue Type: Bug
>Reporter: Victoria Bialas
>Assignee: Victoria Bialas
>Priority: Minor
>
>  
> Docs page: [https://kafka.apache.org/documentation/#log]
> Link to Tweet: [https://twitter.com/Preety48408391/status/1205764249995202560]
> Hi Kafka team, looks like there is issue with below depicting image on Kafka 
> documentation section 5.4. In 2nd segment 82xx.kafka, Message IDs are 
> incorrectly mentioned. Message should start from 82xx but starting from 34xx 
> like in 1st segment. Please correct.
> [~guozhang] , [~mjsax] , this came through on a Tweet, I'll try to fix on the 
> docs. May need some guidance from you, as the problem description isn't 
> completely clear to me.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9306) Kafka Consumer does not clean up all metrics after shutdown

2019-12-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997754#comment-16997754
 ] 

ASF GitHub Bot commented on KAFKA-9306:
---

cmccabe commented on pull request #7839: KAFKA-9306: The consumer must close 
KafkaConsumerMetrics
URL: https://github.com/apache/kafka/pull/7839
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Consumer does not clean up all metrics after shutdown
> ---
>
> Key: KAFKA-9306
> URL: https://issues.apache.org/jira/browse/KAFKA-9306
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0
>Reporter: Colin McCabe
>Assignee: Colin McCabe
>Priority: Major
>
> The Kafka Consumer does not clean up all metrics after shutdown.  It seems 
> like this was a regression introduced in Kafka 2.4 when we added the 
> KafkaConsumerMetrics class.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9306) Kafka Consumer does not clean up all metrics after shutdown

2019-12-16 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-9306:
---

 Summary: Kafka Consumer does not clean up all metrics after 
shutdown
 Key: KAFKA-9306
 URL: https://issues.apache.org/jira/browse/KAFKA-9306
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.4.0
Reporter: Colin McCabe
Assignee: Colin McCabe


The Kafka Consumer does not clean up all metrics after shutdown.  It seems like 
this was a regression introduced in Kafka 2.4 when we added the 
KafkaConsumerMetrics class.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9113) Clean up task management

2019-12-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997745#comment-16997745
 ] 

ASF GitHub Bot commented on KAFKA-9113:
---

guozhangwang commented on pull request #7833: KAFKA-9113: Extract clients from 
tasks to record collectors
URL: https://github.com/apache/kafka/pull/7833
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Clean up task management
> 
>
> Key: KAFKA-9113
> URL: https://issues.apache.org/jira/browse/KAFKA-9113
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>
> Along KIP-429 we did a lot of refactoring of the task management classes, 
> including the TaskManager and AssignedTasks (and children).  While hopefully 
> easier to reason about there's still significant opportunity for further 
> cleanup including safer state tracking.  Some potential improvements:
> 1) Verify that no tasks are ever in more than one state at once. One 
> possibility is to just check that the suspended, created, restoring, and 
> running maps are all disjoint, but this begs the question of when and where 
> to do those checks, and how often. Another idea might be to put all tasks 
> into a single map and just track their state on a per-task basis. Whatever it 
> is should be aware that some methods are on the critical code path, and 
> should not be burdened with excessive safety checks (ie 
> AssignedStreamTasks#process). Alternatively, it seems to make sense to just 
> make each state its own type. We can then do some cleanup of the AbstractTask 
> and StreamTask classes, which currently contain a number of methods specific 
> to only one type/state of task. For example
>  * only active running tasks ever need to be suspendable, yet every task does 
> through suspend then closeSuspended during close.
>  * as the name suggests, closeSuspended should technically only ever apply to 
> suspended tasks
>  * the code paths needed to perform certain actions such as closing or 
> committing a task vary widely between the different states. A restoring task 
> need only close its state manager, but skipping the task.close call and 
> calling only closeStateManager has lead to confusion and time wasted trying 
> to remember or ask someone why that is sufficient
> 2) Cleanup of closing and/or shutdown logic – there are some potential 
> improvements to be made here as well, for example AssignedTasks currently 
> implements a closeZombieTask method despite the fact that standby tasks are 
> never zombies. 
> 3)  The StoreChangelogReader also interacts with (only) the 
> AssignedStreamsTasks class, through the TaskManager. It can be difficult to 
> reason about these interactions and the state of the changelog reader.
> 4) All 4 classes and their state have very strict consistency requirements 
> that currently are almost impossible to verify, which has already resulted in 
> several bugs that we were lucky to catch in time. We should tighten up how 
> these classes manage their own state, and how the overall state is managed 
> between them, so that it is easy to make changes without introducing new bugs 
> because one class updated its own state without knowing it needed to tell 
> another class to also update its



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9161) Close gaps in Streams configs documentation

2019-12-16 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997741#comment-16997741
 ] 

Sophie Blee-Goldman edited comment on KAFKA-9161 at 12/17/19 12:29 AM:
---

upgrade.from fixed by [https://github.com/apache/kafka/pull/7825]


was (Author: ableegoldman):
upgrade.from

> Close gaps in Streams configs documentation
> ---
>
> Key: KAFKA-9161
> URL: https://issues.apache.org/jira/browse/KAFKA-9161
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>  Labels: beginner, newbie
>
> There are a number of Streams configs that aren't documented in the 
> "Configuring a Streams Application" section of the docs. As of 2.3 the 
> missing configs are:
>  # default.windowed.key.serde.inner ^
>  # default.windowed.value.serde.inner ^
>  # max.task.idle.ms
>  # rocksdb.config.setter. ^^
>  # topology.optimization
>  # -upgrade.from- fixed
>  ^ these configs are also missing the corresponding DOC string
>  ^^ this one actually does appear on that page, but instead of being included 
> in the list of Streams configs it is for some reason under  "Consumer and 
> Producer Configuration Parameters" ?
> There are also a few configs whose documented name is slightly incorrect, as 
> it is missing the "default" prefix that appears in the actual code. The 
> "missing-default" configs are:
>  # key.serde
>  # value.serde
>  # timestamp.extractor



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9161) Close gaps in Streams configs documentation

2019-12-16 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9161?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997741#comment-16997741
 ] 

Sophie Blee-Goldman commented on KAFKA-9161:


upgrade.from

> Close gaps in Streams configs documentation
> ---
>
> Key: KAFKA-9161
> URL: https://issues.apache.org/jira/browse/KAFKA-9161
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>  Labels: beginner, newbie
>
> There are a number of Streams configs that aren't documented in the 
> "Configuring a Streams Application" section of the docs. As of 2.3 the 
> missing configs are:
>  # default.windowed.key.serde.inner ^
>  # default.windowed.value.serde.inner ^
>  # max.task.idle.ms
>  # rocksdb.config.setter. ^^
>  # topology.optimization
>  # -upgrade.from- fixed
>  ^ these configs are also missing the corresponding DOC string
>  ^^ this one actually does appear on that page, but instead of being included 
> in the list of Streams configs it is for some reason under  "Consumer and 
> Producer Configuration Parameters" ?
> There are also a few configs whose documented name is slightly incorrect, as 
> it is missing the "default" prefix that appears in the actual code. The 
> "missing-default" configs are:
>  # key.serde
>  # value.serde
>  # timestamp.extractor



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9161) Close gaps in Streams configs documentation

2019-12-16 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9161?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-9161:
---
Description: 
There are a number of Streams configs that aren't documented in the 
"Configuring a Streams Application" section of the docs. As of 2.3 the missing 
configs are:
 # default.windowed.key.serde.inner ^
 # default.windowed.value.serde.inner ^
 # max.task.idle.ms
 # rocksdb.config.setter. ^^
 # topology.optimization
 # -upgrade.from- fixed

 ^ these configs are also missing the corresponding DOC string

 ^^ this one actually does appear on that page, but instead of being included 
in the list of Streams configs it is for some reason under  "Consumer and 
Producer Configuration Parameters" ?

There are also a few configs whose documented name is slightly incorrect, as it 
is missing the "default" prefix that appears in the actual code. The 
"missing-default" configs are:
 # key.serde
 # value.serde
 # timestamp.extractor

  was:
There are a number of Streams configs that aren't documented in the 
"Configuring a Streams Application" section of the docs. As of 2.3 the missing 
configs are:
 # default.windowed.key.serde.inner *
 # default.windowed.value.serde.inner *
 # max.task.idle.ms
 # rocksdb.config.setter. **
 # topology.optimization
 # upgrade.from

* these configs are also missing the corresponding DOC string
** this one actually does appear on that page, but instead of being included in 
the list of Streams configs it is for some reason under  "Consumer and Producer 
Configuration Parameters" ?

There are also a few configs whose documented name is slightly incorrect, ie it 
is missing the "default" prefix that appears in the actual code. I assume this 
was not intentional because there are other configs that do have the "default" 
prefix included, so it doesn't seem to be dropped on purpose. The 
"missing-default" configs are:
 # key.serde
 # value.serde
 # timestamp.extractor


> Close gaps in Streams configs documentation
> ---
>
> Key: KAFKA-9161
> URL: https://issues.apache.org/jira/browse/KAFKA-9161
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Major
>  Labels: beginner, newbie
>
> There are a number of Streams configs that aren't documented in the 
> "Configuring a Streams Application" section of the docs. As of 2.3 the 
> missing configs are:
>  # default.windowed.key.serde.inner ^
>  # default.windowed.value.serde.inner ^
>  # max.task.idle.ms
>  # rocksdb.config.setter. ^^
>  # topology.optimization
>  # -upgrade.from- fixed
>  ^ these configs are also missing the corresponding DOC string
>  ^^ this one actually does appear on that page, but instead of being included 
> in the list of Streams configs it is for some reason under  "Consumer and 
> Producer Configuration Parameters" ?
> There are also a few configs whose documented name is slightly incorrect, as 
> it is missing the "default" prefix that appears in the actual code. The 
> "missing-default" configs are:
>  # key.serde
>  # value.serde
>  # timestamp.extractor



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8784) Remove default implementation of RocksDBConfigSetter#close

2019-12-16 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-8784:
---
Fix Version/s: 3.0.0

> Remove default implementation of RocksDBConfigSetter#close
> --
>
> Key: KAFKA-8784
> URL: https://issues.apache.org/jira/browse/KAFKA-8784
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0
>Reporter: Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In 2.3 we added a new method to RocksDBConfigSetter that will be called when 
> a store is closed to clean up any resources that may have been created in 
> #setConfig. Many large objects such as the block cache might be instantiated 
> in that method, which will leak memory if not closed since they are backed by 
> C++ objects.
> We should consider removing the default implementation to force users to pay 
> attention to whether they may be leaking memory, even if it may be a no-op 
> for many. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8784) Remove default implementation of RocksDBConfigSetter#close

2019-12-16 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8784?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman updated KAFKA-8784:
---
Affects Version/s: (was: 3.0.0)

> Remove default implementation of RocksDBConfigSetter#close
> --
>
> Key: KAFKA-8784
> URL: https://issues.apache.org/jira/browse/KAFKA-8784
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Blocker
> Fix For: 3.0.0
>
>
> In 2.3 we added a new method to RocksDBConfigSetter that will be called when 
> a store is closed to clean up any resources that may have been created in 
> #setConfig. Many large objects such as the block cache might be instantiated 
> in that method, which will leak memory if not closed since they are backed by 
> C++ objects.
> We should consider removing the default implementation to force users to pay 
> attention to whether they may be leaking memory, even if it may be a no-op 
> for many. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9305) Add version 2.4 to streams system tests

2019-12-16 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-9305:
--

Assignee: Bruno Cadonna  (was: Sophie Blee-Goldman)

>  Add version 2.4 to streams system tests
> 
>
> Key: KAFKA-9305
> URL: https://issues.apache.org/jira/browse/KAFKA-9305
> Project: Kafka
>  Issue Type: Bug
>Reporter: Manikumar
>Assignee: Bruno Cadonna
>Priority: Major
> Fix For: 2.5.0
>
>
> cc [~mjsax]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9305) Add version 2.4 to streams system tests

2019-12-16 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-9305:
--

Assignee: Sophie Blee-Goldman

>  Add version 2.4 to streams system tests
> 
>
> Key: KAFKA-9305
> URL: https://issues.apache.org/jira/browse/KAFKA-9305
> Project: Kafka
>  Issue Type: Bug
>Reporter: Manikumar
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.5.0
>
>
> cc [~mjsax]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9305) Add version 2.4 to streams system tests

2019-12-16 Thread Manikumar (Jira)
Manikumar created KAFKA-9305:


 Summary:  Add version 2.4 to streams system tests
 Key: KAFKA-9305
 URL: https://issues.apache.org/jira/browse/KAFKA-9305
 Project: Kafka
  Issue Type: Bug
Reporter: Manikumar
 Fix For: 2.5.0


cc [~mjsax]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9273) Refactor AbstractJoinIntegrationTest and Sub-classes

2019-12-16 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck reassigned KAFKA-9273:
--

Assignee: Sujay Hegde

> Refactor AbstractJoinIntegrationTest and Sub-classes
> 
>
> Key: KAFKA-9273
> URL: https://issues.apache.org/jira/browse/KAFKA-9273
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Sujay Hegde
>Priority: Major
>  Labels: newbie
>
> The  AbstractJoinIntegrationTest uses an embedded broker, but not all the 
> sub-classes require the use of an embedded broker anymore.  Additionally, 
> there are two test remaining that require an embedded broker, but they don't 
> perform joins, the are tests validating other conditions, so ideally those 
> tests should move into a separate test



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9273) Refactor AbstractJoinIntegrationTest and Sub-classes

2019-12-16 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997705#comment-16997705
 ] 

Bill Bejeck commented on KAFKA-9273:


Hi [~sujayopensource]

Yes, feel free to start working on this task.  I've assigned this ticket to 
you.  

Thanks for your interest in contributing.

Bill

> Refactor AbstractJoinIntegrationTest and Sub-classes
> 
>
> Key: KAFKA-9273
> URL: https://issues.apache.org/jira/browse/KAFKA-9273
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bill Bejeck
>Priority: Major
>  Labels: newbie
>
> The  AbstractJoinIntegrationTest uses an embedded broker, but not all the 
> sub-classes require the use of an embedded broker anymore.  Additionally, 
> there are two test remaining that require an embedded broker, but they don't 
> perform joins, the are tests validating other conditions, so ideally those 
> tests should move into a separate test



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9304) Image on Kafka docs shows incorrect message ID segments

2019-12-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997678#comment-16997678
 ] 

Matthias J. Sax commented on KAFKA-9304:


[~orangesnap] Each line in the left box, represents a segment and the two 
numbers in the row indicate the start-offset and end-offset of the segment. The 
verify first segment is "expended" to the right top box, showing a line for 
each message in the segment. You can observe that the start-offset in the left 
hand box matches the offset of the first message in the right hand box (same 
for end-offset and last message).

The last segment is also "expanded" however the offsets in the right bottom box 
don't match that offsets from the last row. Does this make sense?

> Image on Kafka docs shows incorrect message ID segments
> ---
>
> Key: KAFKA-9304
> URL: https://issues.apache.org/jira/browse/KAFKA-9304
> Project: Kafka
>  Issue Type: Bug
>Reporter: Victoria Bialas
>Assignee: Victoria Bialas
>Priority: Minor
>
>  
> Docs page: [https://kafka.apache.org/documentation/#log]
> Link to Tweet: [https://twitter.com/Preety48408391/status/1205764249995202560]
> Hi Kafka team, looks like there is issue with below depicting image on Kafka 
> documentation section 5.4. In 2nd segment 82xx.kafka, Message IDs are 
> incorrectly mentioned. Message should start from 82xx but starting from 34xx 
> like in 1st segment. Please correct.
> [~guozhang] , [~mjsax] , this came through on a Tweet, I'll try to fix on the 
> docs. May need some guidance from you, as the problem description isn't 
> completely clear to me.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8025) Flaky Test RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest#shouldForwardAllDbOptionsCalls

2019-12-16 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8025?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997656#comment-16997656
 ] 

Bill Bejeck commented on KAFKA-8025:


I ran this test locally 10K times and I was not able to get a repeat failure.

> Flaky Test 
> RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest#shouldForwardAllDbOptionsCalls
> 
>
> Key: KAFKA-8025
> URL: https://issues.apache.org/jira/browse/KAFKA-8025
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.3.0
>Reporter: Konstantine Karantasis
>Assignee: Bill Bejeck
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.5.0
>
>
> At least one occurence where the following unit test case failed on a jenkins 
> job that didn't involve any related changes. 
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/2783/consoleFull]
> I have not been able to reproduce it locally on Linux. (For instance 20 
> consecutive runs of this class pass all test cases)
> {code:java}
> 14:06:13 
> org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest
>  > shouldForwardAllDbOptionsCalls STARTED 14:06:14 
> org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.shouldForwardAllDbOptionsCalls
>  failed, log available in 
> /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.12/streams/build/reports/testOutput/org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.shouldForwardAllDbOptionsCalls.test.stdout
>  14:06:14 14:06:14 
> org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest
>  > shouldForwardAllDbOptionsCalls FAILED 14:06:14     
> java.lang.AssertionError: 14:06:14     Expected: a string matching the 
> pattern 'Unexpected method call DBOptions\.baseBackgroundCompactions((.* 
> 14:06:14     *)*):' 14:06:14          but: was "Unexpected method call 
> DBOptions.baseBackgroundCompactions():\n    DBOptions.close(): expected: 3, 
> actual: 0" 14:06:14         at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) 14:06:14         
> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) 14:06:14       
>   at 
> org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.verifyDBOptionsMethodCall(RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java:121)
>  14:06:14         at 
> org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.shouldForwardAllDbOptionsCalls(RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java:101)
>  14:06:14 14:06:14 
> org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest
>  > shouldForwardAllColumnFamilyCalls STARTED 14:06:14 14:06:14 
> org.apache.kafka.streams.state.internals.RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest
>  > shouldForwardAllColumnFamilyCalls PASSED
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9303) Kafka 2.4.0 releaseTarGZ target produces kafka_2.12-2.4.1-SHAPSHOT.tgz instead of kafka_2.12-2.4.1.tgz

2019-12-16 Thread Philip Johnson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Johnson resolved KAFKA-9303.
---
Resolution: Not A Bug

> Kafka 2.4.0 releaseTarGZ target produces kafka_2.12-2.4.1-SHAPSHOT.tgz 
> instead of kafka_2.12-2.4.1.tgz
> --
>
> Key: KAFKA-9303
> URL: https://issues.apache.org/jira/browse/KAFKA-9303
> Project: Kafka
>  Issue Type: Bug
>  Components: build
>Affects Versions: 2.4.0
>Reporter: Philip Johnson
>Priority: Major
>
> The SNAPSHOT should be removed in the 2.4.0 tag.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9304) Image on Kafka docs shows incorrect message ID segments

2019-12-16 Thread Victoria Bialas (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victoria Bialas updated KAFKA-9304:
---
Description: 
 

Docs page: [https://kafka.apache.org/documentation/#log]

Link to Tweet: [https://twitter.com/Preety48408391/status/1205764249995202560]

Hi Kafka team, looks like there is issue with below depicting image on Kafka 
documentation section 5.4. In 2nd segment 82xx.kafka, Message IDs are 
incorrectly mentioned. Message should start from 82xx but starting from 34xx 
like in 1st segment. Please correct.

[~guozhang] , [~mjsax] , this came through on a Tweet, I'll try to fix on the 
docs. May need some guidance from you, as the problem description isn't 
completely clear to me.

  was:
 

Docs page: [https://kafka.apache.org/documentation/#log]

Link to Tweet: [https://twitter.com/Preety48408391/status/1205764249995202560]

Hi Kafka team, looks like there is issue with below depicting image on Kafka 
documentation section 5.4. In 2nd segment 82xx.kafka, Message IDs are 
incorrectly mentioned. Message should start from 82xx but starting from 34xx 
like in 1st segment. Please correct.

 

[~mjsax] , this came through on a Tweet, I'll try to fix on the docs. May need 
some guidance from you, as the problem description isn't completely clear to me.


> Image on Kafka docs shows incorrect message ID segments
> ---
>
> Key: KAFKA-9304
> URL: https://issues.apache.org/jira/browse/KAFKA-9304
> Project: Kafka
>  Issue Type: Bug
>Reporter: Victoria Bialas
>Assignee: Victoria Bialas
>Priority: Minor
>
>  
> Docs page: [https://kafka.apache.org/documentation/#log]
> Link to Tweet: [https://twitter.com/Preety48408391/status/1205764249995202560]
> Hi Kafka team, looks like there is issue with below depicting image on Kafka 
> documentation section 5.4. In 2nd segment 82xx.kafka, Message IDs are 
> incorrectly mentioned. Message should start from 82xx but starting from 34xx 
> like in 1st segment. Please correct.
> [~guozhang] , [~mjsax] , this came through on a Tweet, I'll try to fix on the 
> docs. May need some guidance from you, as the problem description isn't 
> completely clear to me.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9304) Image on Kafka docs shows incorrect message ID segments

2019-12-16 Thread Victoria Bialas (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victoria Bialas reassigned KAFKA-9304:
--

Assignee: Victoria Bialas

> Image on Kafka docs shows incorrect message ID segments
> ---
>
> Key: KAFKA-9304
> URL: https://issues.apache.org/jira/browse/KAFKA-9304
> Project: Kafka
>  Issue Type: Bug
>Reporter: Victoria Bialas
>Assignee: Victoria Bialas
>Priority: Minor
>
>  
> Docs page: [https://kafka.apache.org/documentation/#log]
> Link to Tweet: [https://twitter.com/Preety48408391/status/1205764249995202560]
> Hi Kafka team, looks like there is issue with below depicting image on Kafka 
> documentation section 5.4. In 2nd segment 82xx.kafka, Message IDs are 
> incorrectly mentioned. Message should start from 82xx but starting from 34xx 
> like in 1st segment. Please correct.
>  
> [~mjsax] if you will assign this to me, I'll try to fix on the docs. May need 
> some guidance from you, as the problem description isn't completely clear to 
> me.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9304) Image on Kafka docs shows incorrect message ID segments

2019-12-16 Thread Victoria Bialas (Jira)
Victoria Bialas created KAFKA-9304:
--

 Summary: Image on Kafka docs shows incorrect message ID segments
 Key: KAFKA-9304
 URL: https://issues.apache.org/jira/browse/KAFKA-9304
 Project: Kafka
  Issue Type: Bug
Reporter: Victoria Bialas


 

Docs page: [https://kafka.apache.org/documentation/#log]

Link to Tweet: [https://twitter.com/Preety48408391/status/1205764249995202560]

Hi Kafka team, looks like there is issue with below depicting image on Kafka 
documentation section 5.4. In 2nd segment 82xx.kafka, Message IDs are 
incorrectly mentioned. Message should start from 82xx but starting from 34xx 
like in 1st segment. Please correct.

 

[~mjsax] if you will assign this to me, I'll try to fix on the docs. May need 
some guidance from you, as the problem description isn't completely clear to me.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9304) Image on Kafka docs shows incorrect message ID segments

2019-12-16 Thread Victoria Bialas (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victoria Bialas updated KAFKA-9304:
---
Description: 
 

Docs page: [https://kafka.apache.org/documentation/#log]

Link to Tweet: [https://twitter.com/Preety48408391/status/1205764249995202560]

Hi Kafka team, looks like there is issue with below depicting image on Kafka 
documentation section 5.4. In 2nd segment 82xx.kafka, Message IDs are 
incorrectly mentioned. Message should start from 82xx but starting from 34xx 
like in 1st segment. Please correct.

 

[~mjsax] , this came through on a Tweet, I'll try to fix on the docs. May need 
some guidance from you, as the problem description isn't completely clear to me.

  was:
 

Docs page: [https://kafka.apache.org/documentation/#log]

Link to Tweet: [https://twitter.com/Preety48408391/status/1205764249995202560]

Hi Kafka team, looks like there is issue with below depicting image on Kafka 
documentation section 5.4. In 2nd segment 82xx.kafka, Message IDs are 
incorrectly mentioned. Message should start from 82xx but starting from 34xx 
like in 1st segment. Please correct.

 

[~mjsax] if you will assign this to me, I'll try to fix on the docs. May need 
some guidance from you, as the problem description isn't completely clear to me.


> Image on Kafka docs shows incorrect message ID segments
> ---
>
> Key: KAFKA-9304
> URL: https://issues.apache.org/jira/browse/KAFKA-9304
> Project: Kafka
>  Issue Type: Bug
>Reporter: Victoria Bialas
>Assignee: Victoria Bialas
>Priority: Minor
>
>  
> Docs page: [https://kafka.apache.org/documentation/#log]
> Link to Tweet: [https://twitter.com/Preety48408391/status/1205764249995202560]
> Hi Kafka team, looks like there is issue with below depicting image on Kafka 
> documentation section 5.4. In 2nd segment 82xx.kafka, Message IDs are 
> incorrectly mentioned. Message should start from 82xx but starting from 34xx 
> like in 1st segment. Please correct.
>  
> [~mjsax] , this came through on a Tweet, I'll try to fix on the docs. May 
> need some guidance from you, as the problem description isn't completely 
> clear to me.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9303) Kafka 2.4.0 releaseTarGZ target produces kafka_2.12-2.4.1-SHAPSHOT.tgz instead of kafka_2.12-2.4.1.tgz

2019-12-16 Thread Philip Johnson (Jira)
Philip Johnson created KAFKA-9303:
-

 Summary: Kafka 2.4.0 releaseTarGZ target produces 
kafka_2.12-2.4.1-SHAPSHOT.tgz instead of kafka_2.12-2.4.1.tgz
 Key: KAFKA-9303
 URL: https://issues.apache.org/jira/browse/KAFKA-9303
 Project: Kafka
  Issue Type: Bug
  Components: build
Affects Versions: 2.4.0
Reporter: Philip Johnson


The SNAPSHOT should be removed in the 2.4.0 tag.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9243) Update the javadocs from KeyValueStore to TimestampKeyValueStore

2019-12-16 Thread Demitri Swan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Demitri Swan reassigned KAFKA-9243:
---

Assignee: Demitri Swan

> Update the javadocs from KeyValueStore to TimestampKeyValueStore
> 
>
> Key: KAFKA-9243
> URL: https://issues.apache.org/jira/browse/KAFKA-9243
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Walker Carlson
>Assignee: Demitri Swan
>Priority: Minor
>  Labels: beginner, newbie
>
> As of version 2.3, the DSL uses `TimestampedStores` to represent KTables. 
> However, the JavaDocs of all table-related operators still refer to plain 
> `KeyValueStores` etc instead of `TimestampedKeyValueStore` etc. Hence, all 
> those JavaDocs should be updated (the JavaDocs are technically not incorrect, 
> because one can access a TimestampedKeyValueStore as a KeyValueStore, too – 
> hence this ticket is not a "bug" but an improvement.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9233) Kafka consumer throws undocumented IllegalStateException

2019-12-16 Thread Andrew Olson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997540#comment-16997540
 ] 

Andrew Olson commented on KAFKA-9233:
-

[~junrao] or [~hachikuji] Can you review this?

> Kafka consumer throws undocumented IllegalStateException
> 
>
> Key: KAFKA-9233
> URL: https://issues.apache.org/jira/browse/KAFKA-9233
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.3.0
>Reporter: Andrew Olson
>Assignee: Andrew Olson
>Priority: Minor
>
> If the provided collection of TopicPartition instances contains any 
> duplicates, an IllegalStateException not documented in the javadoc is thrown 
> by internal Java stream code when calling KafkaConsumer#beginningOffsets or 
> KafkaConsumer#endOffsets.
> The stack trace looks like this,
> {noformat}
> java.lang.IllegalStateException: Duplicate key -2
>   at 
> java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
>   at java.util.HashMap.merge(HashMap.java:1254)
>   at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
>   at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:555)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOffsets(Fetcher.java:542)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2054)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.beginningOffsets(KafkaConsumer.java:2031)
> {noformat}
> {noformat}
> java.lang.IllegalStateException: Duplicate key -1
>   at 
> java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133)
>   at java.util.HashMap.merge(HashMap.java:1254)
>   at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320)
>   at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>   at 
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>   at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>   at 
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>   at 
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>   at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>   at 
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.beginningOrEndOffset(Fetcher.java:559)
>   at 
> org.apache.kafka.clients.consumer.internals.Fetcher.endOffsets(Fetcher.java:550)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2109)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.endOffsets(KafkaConsumer.java:2081)
> {noformat}
> Looking at the code, it appears this may likely have been introduced by 
> KAFKA-7831. The exception is not thrown in Kafka 2.2.1, with the duplicated 
> TopicPartition values silently ignored. Either we should document this 
> exception possibility (probably wrapping it with a Kafka exception class) 
> indicating invalid client API usage, or restore the previous behavior where 
> the duplicates were harmless.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9297) CreateTopic API do not work with older version of the request/response

2019-12-16 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-9297:
---
Fix Version/s: 2.5.0

> CreateTopic API do not work with older version of the request/response
> --
>
> Key: KAFKA-9297
> URL: https://issues.apache.org/jira/browse/KAFKA-9297
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 2.5.0
>
>
> The create topic api do not work with older version of the api. It can be 
> reproduced by trying to create a topic with `kafka-topics.sh` from 2.3. It 
> timeouts.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9297) CreateTopic API do not work with older version of the request/response

2019-12-16 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-9297.

Resolution: Fixed

> CreateTopic API do not work with older version of the request/response
> --
>
> Key: KAFKA-9297
> URL: https://issues.apache.org/jira/browse/KAFKA-9297
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 2.5.0
>
>
> The create topic api do not work with older version of the api. It can be 
> reproduced by trying to create a topic with `kafka-topics.sh` from 2.3. It 
> timeouts.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9259) suppress() for windowed-Serdes does not work with default serdes

2019-12-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997533#comment-16997533
 ] 

Matthias J. Sax commented on KAFKA-9259:


Consider KAFKA-9260, especially Guozhangs comment on it, it might be a good 
idea to address KAFKA-9260 as it would resolve KAFKA-9248 and KAFKA-9259 at 
once IMHO. When I create KAFKA-9260, it was unclear to me how it could be done, 
but Guozhangs proposal makes a lot of sense to me. Of course, KAFKA-9260 would 
require a KIP and the fix would only be available for future releases.

Hence, I am also fine if we just just `suppress()` directly, and also back port 
the fix to 2.4 and 2.3, as KAFKA-9260 will only apply to 2.5...

> suppress() for windowed-Serdes does not work with default serdes
> 
>
> Key: KAFKA-9259
> URL: https://issues.apache.org/jira/browse/KAFKA-9259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Matthias J. Sax
>Assignee: Omkar Mestry
>Priority: Major
>  Labels: newbie
>
> The suppress() operator either inherits serdes from its upstream operator or 
> falls back to default serdes from the config.
> If the upstream operator is an windowed aggregation, the window-aggregation 
> operator wraps the user passed-in serde with a window-serde and pushed it 
> into suppress() – however, if default serdes are used, the window-aggregation 
> operator cannot push anything into suppress(). At runtime, it just creates a 
> default serde and wraps it according. For this case, suppress() also falls 
> back to default serdes; however, it does not wrap the serde and thus a 
> ClassCastException is thrown when the serde is used later.
> suppress() is already aware if the upstream aggregation is time/session 
> windowed or not and thus should use this information to wrap default serdes 
> accordingly.
> The current workaround for windowed-suppress is to overwrite the default 
> serde upstream to suppress(), such that suppress() inherits serdes and does 
> not fall back to default serdes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9148) Consider forking RocksDB for Streams

2019-12-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997527#comment-16997527
 ] 

Matthias J. Sax commented on KAFKA-9148:


[~adamretter] Thanks a lot for your response! Highly appreciated. From my 
personal view, it would be beneficial to not fork RocksDB but to work closer 
with the RocksDB community instead.

About:
{quote}If we were following SemVer 2, then it is fine/expected to make breaking 
API changes in Major version changes. Regardless, the API of RocksJava closely 
follows the C++ API, if it was not deprecated in the C++ API, then it would not 
have been deprecated in the Java API.
{quote}
While this is fine to make breaking changes, it's pretty harsh to remove APIs 
without a deprecation period. In Kafka, we are more conservative, and we would 
never remove APIs without it. Hence, we only remove APIs in major releases 
after a deprecation period of at least 2 prior minor release in general.

> Consider forking RocksDB for Streams 
> -
>
> Key: KAFKA-9148
> URL: https://issues.apache.org/jira/browse/KAFKA-9148
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> We recently upgraded our RocksDB dependency to 5.18 for its memory-management 
> abilities (namely the WriteBufferManager, see KAFKA-8215). Unfortunately, 
> someone from Flink recently discovered a ~8% [performance 
> regression|https://github.com/facebook/rocksdb/issues/5774] that exists in 
> all versions 5.18+ (up through the current newest version, 6.2.2). Flink was 
> able to react to this by downgrading to 5.17 and [picking the 
> WriteBufferManage|https://github.com/dataArtisans/frocksdb/pull/4]r to their 
> fork (fRocksDB).
> Due to this and other reasons enumerated below, we should consider also 
> forking our own RocksDB for Streams.
> Pros:
>  * We can avoid passing sudden breaking changes on to our users, such removal 
> of methods with no deprecation period (see discussion on KAFKA-8897)
>  * We can pick whichever version has the best performance for our needs, and 
> pick over any new features, metrics, etc that we need to use rather than 
> being forced to upgrade (and breaking user code, introducing regression, etc)
>  * Support for some architectures does not exist in all RocksDB versions, 
> making Streams completely unusable for some users until we can upgrade the 
> rocksdb dependency to one that supports their specific case
>  * The Java API seems to be a very low priority to the rocksdb folks.
>  ** They leave out critical functionality, features, and configuration 
> options that have been in the c++ API for a very long time
>  ** Those that do make it over often have random gaps in the API such as 
> setters but no getters (see [rocksdb PR 
> #5186|https://github.com/facebook/rocksdb/pull/5186])
>  ** Others are poorly designed and require too many trips across the JNI, 
> making otherwise incredibly useful features prohibitively expensive.
>  *** [Custom comparator|#issuecomment-83145980]]: a custom comparator could 
> significantly improve the performance of session windows. This is trivial to 
> do but given the high performance cost of crossing the jni, it is currently 
> only practical to use a c++ comparator
>  *** [Prefix Seek|https://github.com/facebook/rocksdb/issues/6004]: not 
> currently used by Streams but a commonly requested feature, and may also 
> allow improved range queries
>  ** Even when an external contributor develops a solution for poorly 
> performing Java functionality and helpfully tries to contribute their patch 
> back to rocksdb, it gets ignored by the rocksdb people ([rocksdb PR 
> #2283|https://github.com/facebook/rocksdb/pull/2283])
> Cons:
>  * More work (not to be trivialized, the truth is we don't and can't know how 
> much extra work this will ultimately be)
> Given that we rarely upgrade the Rocks dependency, use only some fraction of 
> its features, and would need or want to make only minimal changes ourselves, 
> it seems like we could actually get away with very little extra work by 
> forking rocksdb. Note that as of this writing the frocksdb repo has only 
> needed to open 5 PRs on top of the actual rocksdb (two of them trivial). Of 
> course, the LOE to maintain this will only grow over time, so we should think 
> carefully about whether and when to start taking on this potential burden.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9297) CreateTopic API do not work with older version of the request/response

2019-12-16 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997525#comment-16997525
 ] 

ASF GitHub Bot commented on KAFKA-9297:
---

mimaison commented on pull request #7829: KAFKA-9297; CreateTopic API do not 
work with older version of the request/response
URL: https://github.com/apache/kafka/pull/7829
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> CreateTopic API do not work with older version of the request/response
> --
>
> Key: KAFKA-9297
> URL: https://issues.apache.org/jira/browse/KAFKA-9297
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>
> The create topic api do not work with older version of the api. It can be 
> reproduced by trying to create a topic with `kafka-topics.sh` from 2.3. It 
> timeouts.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9243) Update the javadocs from KeyValueStore to TimestampKeyValueStore

2019-12-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997520#comment-16997520
 ] 

Matthias J. Sax commented on KAFKA-9243:


Thanks for your interest [~miroswan]! I added you to the list of contributors 
that allows you to self-assign tickets.

I would recommend to check out [https://kafka.apache.org/contributing] to get 
started. Looking forward to you PR.

> Update the javadocs from KeyValueStore to TimestampKeyValueStore
> 
>
> Key: KAFKA-9243
> URL: https://issues.apache.org/jira/browse/KAFKA-9243
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Walker Carlson
>Priority: Minor
>  Labels: beginner, newbie
>
> As of version 2.3, the DSL uses `TimestampedStores` to represent KTables. 
> However, the JavaDocs of all table-related operators still refer to plain 
> `KeyValueStores` etc instead of `TimestampedKeyValueStore` etc. Hence, all 
> those JavaDocs should be updated (the JavaDocs are technically not incorrect, 
> because one can access a TimestampedKeyValueStore as a KeyValueStore, too – 
> hence this ticket is not a "bug" but an improvement.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9243) Update the javadocs from KeyValueStore to TimestampKeyValueStore

2019-12-16 Thread Demitri Swan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997491#comment-16997491
 ] 

Demitri Swan commented on KAFKA-9243:
-

 I am available to take this one. 

> Update the javadocs from KeyValueStore to TimestampKeyValueStore
> 
>
> Key: KAFKA-9243
> URL: https://issues.apache.org/jira/browse/KAFKA-9243
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Walker Carlson
>Priority: Minor
>  Labels: beginner, newbie
>
> As of version 2.3, the DSL uses `TimestampedStores` to represent KTables. 
> However, the JavaDocs of all table-related operators still refer to plain 
> `KeyValueStores` etc instead of `TimestampedKeyValueStore` etc. Hence, all 
> those JavaDocs should be updated (the JavaDocs are technically not incorrect, 
> because one can access a TimestampedKeyValueStore as a KeyValueStore, too – 
> hence this ticket is not a "bug" but an improvement.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8705) NullPointerException was thrown by topology optimization when two MergeNodes have common KeyChaingingNode

2019-12-16 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8705?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997490#comment-16997490
 ] 

Bill Bejeck commented on KAFKA-8705:


cherry-picked to 2.4

> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode
> -
>
> Key: KAFKA-8705
> URL: https://issues.apache.org/jira/browse/KAFKA-8705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Hiroshi Nakahara
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 2.2.3, 2.5.0, 2.3.2, 2.4.1
>
>
> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode.
> Kafka Stream version: 2.3.0
> h3. Code
> {code:java}
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.StreamsBuilder;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.Consumed;
> import org.apache.kafka.streams.kstream.KStream;
> import java.util.Properties;
> public class Main {
> public static void main(String[] args) {
> final StreamsBuilder streamsBuilder = new StreamsBuilder();
> final KStream parentStream = 
> streamsBuilder.stream("parentTopic", Consumed.with(Serdes.Integer(), 
> Serdes.Integer()))
> .selectKey(Integer::sum);  // To make parentStream 
> KeyChaingingPoint
> final KStream childStream1 = 
> parentStream.mapValues(v -> v + 1);
> final KStream childStream2 = 
> parentStream.mapValues(v -> v + 2);
> final KStream childStream3 = 
> parentStream.mapValues(v -> v + 3);
> childStream1
> .merge(childStream2)
> .merge(childStream3)
> .to("outputTopic");
> final Properties properties = new Properties();
> properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
> StreamsConfig.OPTIMIZE);
> streamsBuilder.build(properties);
> }
> }
> {code}
> h3. Expected result
> streamsBuilder.build should create Topology without throwing Exception.  The 
> expected topology is:
> {code:java}
> Topologies:
>Sub-topology: 0
> Source: KSTREAM-SOURCE-00 (topics: [parentTopic])
>   --> KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-KEY-SELECT-01 (stores: [])
>   --> KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03, 
> KSTREAM-MAPVALUES-04
>   <-- KSTREAM-SOURCE-00
> Processor: KSTREAM-MAPVALUES-02 (stores: [])
>   --> KSTREAM-MERGE-05
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MAPVALUES-03 (stores: [])
>   --> KSTREAM-MERGE-05
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MAPVALUES-04 (stores: [])
>   --> KSTREAM-MERGE-06
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MERGE-05 (stores: [])
>   --> KSTREAM-MERGE-06
>   <-- KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03
> Processor: KSTREAM-MERGE-06 (stores: [])
>   --> KSTREAM-SINK-07
>   <-- KSTREAM-MERGE-05, KSTREAM-MAPVALUES-04
> Sink: KSTREAM-SINK-07 (topic: outputTopic)
>   <-- KSTREAM-MERGE-06
> {code}
> h3. Actual result
> NullPointerException was thrown with the following stacktrace.
> {code:java}
> Exception in thread "main" java.lang.NullPointerException
>   at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeUpdateKeyChangingRepartitionNodeMap(InternalStreamsBuilder.java:397)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeOptimizeRepartitionOperations(InternalStreamsBuilder.java:315)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybePerformOptimizations(InternalStreamsBuilder.java:304)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:275)
>   at 
> org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:558)
>   at Main.main(Main.java:24){code}
> h3. Cause
> This exception occurs in 
> InternalStreamsBuilder#maybeUpdateKeyChaingingRepartitionNodeMap.
> {code:java}
> private void maybeUpdateKeyChangingRepartitionNodeMap() {
> final Map> 
> mergeNodesToKeyChangers = new HashMap<>();
> for (final StreamsGraphNode mergeNode : mergeNodes) {
> mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>());
> final Collection keys = 
> 

[jira] [Updated] (KAFKA-8705) NullPointerException was thrown by topology optimization when two MergeNodes have common KeyChaingingNode

2019-12-16 Thread Bill Bejeck (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8705?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bill Bejeck updated KAFKA-8705:
---
Fix Version/s: 2.4.1

> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode
> -
>
> Key: KAFKA-8705
> URL: https://issues.apache.org/jira/browse/KAFKA-8705
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Hiroshi Nakahara
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 2.2.3, 2.5.0, 2.3.2, 2.4.1
>
>
> NullPointerException was thrown by topology optimization when two MergeNodes 
> have common KeyChaingingNode.
> Kafka Stream version: 2.3.0
> h3. Code
> {code:java}
> import org.apache.kafka.common.serialization.Serdes;
> import org.apache.kafka.streams.StreamsBuilder;
> import org.apache.kafka.streams.StreamsConfig;
> import org.apache.kafka.streams.kstream.Consumed;
> import org.apache.kafka.streams.kstream.KStream;
> import java.util.Properties;
> public class Main {
> public static void main(String[] args) {
> final StreamsBuilder streamsBuilder = new StreamsBuilder();
> final KStream parentStream = 
> streamsBuilder.stream("parentTopic", Consumed.with(Serdes.Integer(), 
> Serdes.Integer()))
> .selectKey(Integer::sum);  // To make parentStream 
> KeyChaingingPoint
> final KStream childStream1 = 
> parentStream.mapValues(v -> v + 1);
> final KStream childStream2 = 
> parentStream.mapValues(v -> v + 2);
> final KStream childStream3 = 
> parentStream.mapValues(v -> v + 3);
> childStream1
> .merge(childStream2)
> .merge(childStream3)
> .to("outputTopic");
> final Properties properties = new Properties();
> properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
> StreamsConfig.OPTIMIZE);
> streamsBuilder.build(properties);
> }
> }
> {code}
> h3. Expected result
> streamsBuilder.build should create Topology without throwing Exception.  The 
> expected topology is:
> {code:java}
> Topologies:
>Sub-topology: 0
> Source: KSTREAM-SOURCE-00 (topics: [parentTopic])
>   --> KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-KEY-SELECT-01 (stores: [])
>   --> KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03, 
> KSTREAM-MAPVALUES-04
>   <-- KSTREAM-SOURCE-00
> Processor: KSTREAM-MAPVALUES-02 (stores: [])
>   --> KSTREAM-MERGE-05
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MAPVALUES-03 (stores: [])
>   --> KSTREAM-MERGE-05
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MAPVALUES-04 (stores: [])
>   --> KSTREAM-MERGE-06
>   <-- KSTREAM-KEY-SELECT-01
> Processor: KSTREAM-MERGE-05 (stores: [])
>   --> KSTREAM-MERGE-06
>   <-- KSTREAM-MAPVALUES-02, KSTREAM-MAPVALUES-03
> Processor: KSTREAM-MERGE-06 (stores: [])
>   --> KSTREAM-SINK-07
>   <-- KSTREAM-MERGE-05, KSTREAM-MAPVALUES-04
> Sink: KSTREAM-SINK-07 (topic: outputTopic)
>   <-- KSTREAM-MERGE-06
> {code}
> h3. Actual result
> NullPointerException was thrown with the following stacktrace.
> {code:java}
> Exception in thread "main" java.lang.NullPointerException
>   at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeUpdateKeyChangingRepartitionNodeMap(InternalStreamsBuilder.java:397)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybeOptimizeRepartitionOperations(InternalStreamsBuilder.java:315)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.maybePerformOptimizations(InternalStreamsBuilder.java:304)
>   at 
> org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder.buildAndOptimizeTopology(InternalStreamsBuilder.java:275)
>   at 
> org.apache.kafka.streams.StreamsBuilder.build(StreamsBuilder.java:558)
>   at Main.main(Main.java:24){code}
> h3. Cause
> This exception occurs in 
> InternalStreamsBuilder#maybeUpdateKeyChaingingRepartitionNodeMap.
> {code:java}
> private void maybeUpdateKeyChangingRepartitionNodeMap() {
> final Map> 
> mergeNodesToKeyChangers = new HashMap<>();
> for (final StreamsGraphNode mergeNode : mergeNodes) {
> mergeNodesToKeyChangers.put(mergeNode, new LinkedHashSet<>());
> final Collection keys = 
> keyChangingOperationsToOptimizableRepartitionNodes.keySet();
>  

[jira] [Updated] (KAFKA-9302) Allow custom partitioning in table-table joins

2019-12-16 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-9302:
---
Description: 
For KTables, Kafka Streams assume that both input tables are partitions by 
their key. This is a natural choice that works well for primary key table-table 
joins. However, for foreign-key joins (as introduced in 2.4 via KIP-213 / 
KAFKA-3705), this assumption is actually quite strict.

If we relax this requirement, we would need to consider the following:
 * For primary key joins, the assumption is that both input topics use the same 
number of partitions and the same partitioning strategy based on the primary 
key. Kafka Streams can only check at runtime if the number of partitions match 
and optimistically assume that the partitioning strategy is correct. This 
allows for a distributed join computations with one task per input topic 
partition-pair.

If we relax this requirement, we would need to allow users to pass in a custom 
partitioner and would need to repartitions one table before we can execute the 
join. Hence, while this feature would make Kafka Streams more flexible, it's an 
expensive operation. Note, that user can do the repartitioning "upstream" 
manually if required. However, if we consider the next bullet point, we might 
want to have built-in support for this case to lift the burden from the user.

 
 * For foreign key join, the same assumption as for primary key joins is made. 
This implies that we always need to repartition data both ways to send 
subscriptions request and subscription response to the correct tasks. At the 
same time, if the input KTables would use a custom primary key partitioning 
strategy, the join would fail. Hence, there are two potential improvements:

 ## Allow users to pass in a custom (primary key based) partitioner, that can 
be used to write into the subscription request and subscription response 
repartition topics correctly (if the "receiver side" table is partitioned by 
custom strategy). This would be a straightforward extension of the existing 
foreign key join.
 ## Assuming that the right hand side is using the same number of partitions as 
the left hand side, and assuming that the left hand side is not partitioned by 
its primary key, but by the join attribute (ie, FK key extracted from the 
value)*, no repartitioning would be required (ie, no subscription requests and 
responses), but the join could be executed in a single task per input topic 
partition-pair.



*) Note, that the assumption that the right hand side would not be partitioned 
by it's primary key but by the primary key of the left hand side does not make 
too much sense. While, if this would be true, would also allow for a 
co-partitioned execution without the need to repartition the data, the right 
hand side does not know the primary key of the left hand side and thus, such a 
partitioning cannot be achieved.

  was:
In the 2.4 release, we introduces foreign-key joins via KIP-213 (KAFKA-3705). 
However, the API does not allow to work with a custom partitioning strategy, 
ie, it always assumes that the both input tables use the default partitioner.

We should allow to pass in a custom partitioner for each side that can be used 
when writing in the repartitions topics.


> Allow custom partitioning in table-table joins
> --
>
> Key: KAFKA-9302
> URL: https://issues.apache.org/jira/browse/KAFKA-9302
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> For KTables, Kafka Streams assume that both input tables are partitions by 
> their key. This is a natural choice that works well for primary key 
> table-table joins. However, for foreign-key joins (as introduced in 2.4 via 
> KIP-213 / KAFKA-3705), this assumption is actually quite strict.
> If we relax this requirement, we would need to consider the following:
>  * For primary key joins, the assumption is that both input topics use the 
> same number of partitions and the same partitioning strategy based on the 
> primary key. Kafka Streams can only check at runtime if the number of 
> partitions match and optimistically assume that the partitioning strategy is 
> correct. This allows for a distributed join computations with one task per 
> input topic partition-pair.
> If we relax this requirement, we would need to allow users to pass in a 
> custom partitioner and would need to repartitions one table before we can 
> execute the join. Hence, while this feature would make Kafka Streams more 
> flexible, it's an expensive operation. Note, that user can do the 
> repartitioning "upstream" manually if required. However, if we consider the 
> next bullet point, we might want to have built-in support for 

[jira] [Commented] (KAFKA-9259) suppress() for windowed-Serdes does not work with default serdes

2019-12-16 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997447#comment-16997447
 ] 

John Roesler commented on KAFKA-9259:
-

Hi [~omanges], yes, of course!

Although this work doesn't need a KIP, it's probably worth having a discussion 
here about the approach before you send a PR, just because it seems like 
different folks have different opinions about the best way to implement it.

Can you please also take a look at 
https://issues.apache.org/jira/browse/KAFKA-9260 for context? And please let us 
know any clarifications you might need.

Thanks!,
-John

> suppress() for windowed-Serdes does not work with default serdes
> 
>
> Key: KAFKA-9259
> URL: https://issues.apache.org/jira/browse/KAFKA-9259
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Matthias J. Sax
>Assignee: Omkar Mestry
>Priority: Major
>  Labels: newbie
>
> The suppress() operator either inherits serdes from its upstream operator or 
> falls back to default serdes from the config.
> If the upstream operator is an windowed aggregation, the window-aggregation 
> operator wraps the user passed-in serde with a window-serde and pushed it 
> into suppress() – however, if default serdes are used, the window-aggregation 
> operator cannot push anything into suppress(). At runtime, it just creates a 
> default serde and wraps it according. For this case, suppress() also falls 
> back to default serdes; however, it does not wrap the serde and thus a 
> ClassCastException is thrown when the serde is used later.
> suppress() is already aware if the upstream aggregation is time/session 
> windowed or not and thus should use this information to wrap default serdes 
> accordingly.
> The current workaround for windowed-suppress is to overwrite the default 
> serde upstream to suppress(), such that suppress() inherits serdes and does 
> not fall back to default serdes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9302) Allow custom partitioning in table-table joins

2019-12-16 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9302?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-9302:
---
Summary: Allow custom partitioning in table-table joins  (was: Allow custom 
partitioning in foreign-key joins)

> Allow custom partitioning in table-table joins
> --
>
> Key: KAFKA-9302
> URL: https://issues.apache.org/jira/browse/KAFKA-9302
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> In the 2.4 release, we introduces foreign-key joins via KIP-213 (KAFKA-3705). 
> However, the API does not allow to work with a custom partitioning strategy, 
> ie, it always assumes that the both input tables use the default partitioner.
> We should allow to pass in a custom partitioner for each side that can be 
> used when writing in the repartitions topics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9302) Allow custom partitioning in foreign-key joins

2019-12-16 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9302?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997432#comment-16997432
 ] 

John Roesler commented on KAFKA-9302:
-

Thanks for the report, [~mjsax]. It also comes to mind that we neglected to 
allow customizing the internal store implementation (it's always rocksdb, and 
the cache is disabled). Maybe we can design a config object solution to all of 
this together.

> Allow custom partitioning in foreign-key joins
> --
>
> Key: KAFKA-9302
> URL: https://issues.apache.org/jira/browse/KAFKA-9302
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> In the 2.4 release, we introduces foreign-key joins via KIP-213 (KAFKA-3705). 
> However, the API does not allow to work with a custom partitioning strategy, 
> ie, it always assumes that the both input tables use the default partitioner.
> We should allow to pass in a custom partitioner for each side that can be 
> used when writing in the repartitions topics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9302) Allow custom partitioning in foreign-key joins

2019-12-16 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-9302:
--

 Summary: Allow custom partitioning in foreign-key joins
 Key: KAFKA-9302
 URL: https://issues.apache.org/jira/browse/KAFKA-9302
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Matthias J. Sax


In the 2.4 release, we introduces foreign-key joins via KIP-213 (KAFKA-3705). 
However, the API does not allow to work with a custom partitioning strategy, 
ie, it always assumes that the both input tables use the default partitioner.

We should allow to pass in a custom partitioner for each side that can be used 
when writing in the repartitions topics.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9300) Create a topic based on the specified brokers

2019-12-16 Thread weiwei (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9300?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

weiwei updated KAFKA-9300:
--
Affects Version/s: (was: 2.2.2)
   2.3.0

> Create a topic based on the specified brokers
> -
>
> Key: KAFKA-9300
> URL: https://issues.apache.org/jira/browse/KAFKA-9300
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Affects Versions: 2.3.0
>Reporter: weiwei
>Assignee: weiwei
>Priority: Major
> Fix For: 2.4.0
>
>   Original Estimate: 120h
>  Remaining Estimate: 120h
>
> Generally, A Kafka cluster serves multiple businesses. To reduce the impact 
> of businesses, many companies isolate brokers to physically isolate 
> businesses. That is, the topics of certain businesses are created on the 
> specified brokers. The current topic creation script supports only create 
> topic according replica-assignment . This function is not convenient for the 
> service to specify the brokers. Therefore, you need to add this function as 
> follows: Create a topci based on the specified brokers. The 
> replica-assignment-brokers parameter is added to indicate the broker range of 
> the topic distribution. If this parameter is not set, all broker nodes in the 
> cluster are used. For example, kafka-topics.sh --create --topic test06 
> --partitions 2 --replication-factor 1 --zookeeper zkurl -- 
> --replica-assignment-brokers=1,2.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9300) Create a topic based on the specified brokers

2019-12-16 Thread weiwei (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997347#comment-16997347
 ] 

weiwei commented on KAFKA-9300:
---

[~huxi_2b]  I plan to add  a topic-creating  config 
'replica-assignment-brokers' and the underlying layer still invokes 
AdminUtils.assignReplicasToBrokers( ). I do not think that the public API 
modification is involved and maybe KIP is not required.

> Create a topic based on the specified brokers
> -
>
> Key: KAFKA-9300
> URL: https://issues.apache.org/jira/browse/KAFKA-9300
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Affects Versions: 2.2.2
>Reporter: weiwei
>Assignee: weiwei
>Priority: Major
> Fix For: 2.4.0
>
>   Original Estimate: 120h
>  Remaining Estimate: 120h
>
> Generally, A Kafka cluster serves multiple businesses. To reduce the impact 
> of businesses, many companies isolate brokers to physically isolate 
> businesses. That is, the topics of certain businesses are created on the 
> specified brokers. The current topic creation script supports only create 
> topic according replica-assignment . This function is not convenient for the 
> service to specify the brokers. Therefore, you need to add this function as 
> follows: Create a topci based on the specified brokers. The 
> replica-assignment-brokers parameter is added to indicate the broker range of 
> the topic distribution. If this parameter is not set, all broker nodes in the 
> cluster are used. For example, kafka-topics.sh --create --topic test06 
> --partitions 2 --replication-factor 1 --zookeeper zkurl -- 
> --replica-assignment-brokers=1,2.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9148) Consider forking RocksDB for Streams

2019-12-16 Thread adam Retter (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997207#comment-16997207
 ] 

adam Retter edited comment on KAFKA-9148 at 12/16/19 11:59 AM:
---

Sorry to hear about your troubles, I would like to put forward a case for not 
forking RocksDB:
{quote}We can avoid passing sudden breaking changes on to our users, such 
removal of methods with no deprecation period (see discussion on KAFKA-8897)
{quote}
>From KAFKA-8897:
{quote}They never deprecated it. CompactionOptionsFIFO#setTtl() is there in 
5.18.3, but is removed in 6.0.
{quote}
If we were following SemVer 2, then it is fine/expected to make breaking API 
changes in Major version changes. Regardless, the API of RocksJava closely 
follows the C++ API, if it was not deprecated in the C++ API, then it would not 
have been deprecated in the Java API.
{quote}Support for some architectures does not exist in all RocksDB versions, 
making Streams completely unusable for some users until we can upgrade the 
rocksdb dependency to one that supports their specific case
{quote}
As well as x86 and x86_64, we have added pcc64le and aarch64 recently, and 
these are both shipped since 6.4.6 (and ppc64le actually for much longer). We 
have very recently also added support for distributions using muslc instead of 
glibc, and that will be available for x86, x86_64, pcc64le, and aarch64 in our 
next release.

Oh... and we are experimenting with s390x at the moment too!
{quote}The Java API seems to be a very low priority to the rocksdb folks.
{quote}
The development of the Java API has fewer resources allocated to it, but is 
still important. One problem is that we receive very little feedback about the 
Java API, so it is hard to know who is using it and the issues they have. For 
example, I only just discovered this discussion about it!

I can also personally apologise. I get bombarded with things from many 
directions, and sometimes things fall through the cracks. If you can keep 
on-top-of your issues and remind me, then I will try and help where I can.
{quote}They leave out critical functionality, features, and configuration 
options that have been in the c++ API for a very long time
{quote}
The Java API tries to mirror the C++ API, and for the most part we have most of 
the functionality that users expect. The C++ API changes very rapidly, with 
things constantly being added, removed, re-added and modified. Some of these 
things are "experimental". In some ways, it is good that the Java API is a 
little behind the C++ API for stability. We try not to miss critical 
functionality though, and when this is raised as an issue we try to address it 
quickly. It is also worth adding that some things don't make sense to port into 
the Java API, this might be due to performance and language constraints. If you 
feel that there are critical things missing please open an issue for each one 
([https://github.com/facebook/rocksdb/issues]) and mention `@adamretter`.
{quote}Those that do make it over often have random gaps in the API such as 
setters but no getters (see [rocksdb PR 
#5186|https://github.com/facebook/rocksdb/pull/5186])
{quote}
This was a contribution from an external developer, and it was deemed that it 
improved the current situation even if it was not perfect. Please feel free to 
send PRs yourselves that also improve the situation further.
{quote}Others are poorly designed and require too many trips across the JNI, 
making otherwise incredibly useful features prohibitively expensive.
{quote}
Crossing the JNI boundary is expensive. That is a fact that we cannot ourselves 
change, this is down to those that specify the JVM and JNI fundamentals and 
also the JVM implementers. We can of course try to design our APIs to 
ameliorate these where possible. There is currently ongoing work in this area, 
some of which will be merged before the new year.
{quote}Custom comparator]: a custom comparator could significantly improve the 
performance of session windows. This is trivial to do but given the high 
performance cost of crossing the jni, it is currently only practical to use a 
c++ comparator
{quote}
This is really a fundamental JNI restriction. Things like Comparator and Merge 
Operator are callback based C++ APIs, and crossing that boundary into Java on 
every callback is expensive. We have some improvements to Java Comparators 
which will come in hopefully before the new year to remove the mutex around 
Slice reuse.

For you it would be sensible to implement your performance critical comparators 
in C++ and ship them as a native library. I believe there is some work 
in-progress for RocksDB to gain a mechanism for a module system to allow 
loading C++ comparators and merge operators dynamically. We could potentially 
expose that from the Java API to allow you to load your native comparators.
{quote}[Prefix 

[jira] [Commented] (KAFKA-9225) kafka fail to run on linux-aarch64

2019-12-16 Thread adam Retter (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997208#comment-16997208
 ] 

adam Retter commented on KAFKA-9225:


For aarch64 I would suggest using 6.4.6+.

 

Also most of the change between 5.18.3 and 6.4.6 are around database options, 
the core (most frequently used) options have not changed a great deal, so I 
would expect to see little real-world application breakage in practice.

> kafka fail to run on linux-aarch64
> --
>
> Key: KAFKA-9225
> URL: https://issues.apache.org/jira/browse/KAFKA-9225
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: jiamei xie
>Priority: Blocker
>  Labels: incompatible
> Fix For: 3.0.0
>
> Attachments: compat_report.html
>
>
> *Steps to reproduce:*
> 1. Download Kafka latest source code
> 2. Build it with gradle
> 3. Run 
> [streamDemo|[https://kafka.apache.org/23/documentation/streams/quickstart]]
> when running bin/kafka-run-class.sh 
> org.apache.kafka.streams.examples.wordcount.WordCountDemo, it crashed with 
> the following error message
> {code:java}
> xjm@ubuntu-arm01:~/kafka$bin/kafka-run-class.sh 
> org.apache.kafka.streams.examples.wordcount.WordCountDemo
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/core/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/tools/build/dependant-libs-2.12.10/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/api/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/transforms/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/runtime/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/file/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/mirror/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/mirror-client/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/json/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in 
> [jar:file:/home/xjm/kafka/connect/basic-auth-extension/build/dependant-libs/slf4j-log4j12-1.7.28.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an 
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> [2019-11-19 15:42:23,277] WARN The configuration 'admin.retries' was supplied 
> but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
> [2019-11-19 15:42:23,278] WARN The configuration 'admin.retry.backoff.ms' was 
> supplied but isn't a known config. 
> (org.apache.kafka.clients.consumer.ConsumerConfig)
> [2019-11-19 15:42:24,278] ERROR stream-client 
> [streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48] All stream threads 
> have died. The instance will be in error state and should be closed. 
> (org.apach e.kafka.streams.KafkaStreams)
> Exception in thread 
> "streams-wordcount-0f3cf88b-e2c4-4fb6-b7a3-9754fad5cd48-StreamThread-1" 
> java.lang.UnsatisfiedLinkError: /tmp/librocksdbjni1377754636857652484.so: 
> /tmp/librocksdbjni13777546368576524 84.so: 
> cannot open shared object file: No such file or directory (Possible cause: 
> can't load AMD 64-bit .so on a AARCH64-bit platform)
> {code}
>  *Analyze:*
> This issue is caused by rocksdbjni-5.18.3.jar which doesn't come with aarch64 
> native support. Replace rocksdbjni-5.18.3.jar with rocksdbjni-6.3.6.jar from 
> [https://mvnrepository.com/artifact/org.rocksdb/rocksdbjni/6.3.6] can fix 
> this problem.
> Attached is the binary compatibility report of rocksdbjni.jar between 5.18.3 
> and 6.3.6. The result is 81.8%. So is it possible to upgrade rocksdbjni to 
> 6.3.6 in upstream? Should there be any kind of tests to execute, please 
> kindly point me. Thanks a lot.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9148) Consider forking RocksDB for Streams

2019-12-16 Thread adam Retter (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9148?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997207#comment-16997207
 ] 

adam Retter commented on KAFKA-9148:


Sorry to hear about your troubles, I would like to put forward a case for not 
forking RocksDB:

{quote}
 We can avoid passing sudden breaking changes on to our users, such removal of 
methods with no deprecation period (see discussion on KAFKA-8897)
{quote}

>From KAFKA-8897:
{quote}
They never deprecated it. CompactionOptionsFIFO#setTtl() is there in 5.18.3, 
but is removed in 6.0.
{quote}

If we were following SemVer 2, then it is fine/expected to make breaking API 
changes in Major version changes. Regardless, the API of RocksJava closely 
follows the C++ API, if it was not deprecated in the C++ API, then it would not 
have been deprecated in the Java API. 


{quote}
Support for some architectures does not exist in all RocksDB versions, making 
Streams completely unusable for some users until we can upgrade the rocksdb 
dependency to one that supports their specific case
{quote}

As well as x86 and x86_64, we have added pcc64le and aarch64 recently, and 
these are both shipped since 6.4.6 (and ppc64le actually for much longer). We 
have very recently also added support for distributions using muslc instead of 
glibc, and that will be available for x86, x86_64, pcc64le, and aarch64 in our 
next release.

{quote}
The Java API seems to be a very low priority to the rocksdb folks.
{quote}

The development of the Java API has fewer resources allocated to it, but is 
still important. One problem is that we receive very little feedback about the 
Java API, so it is hard to know who is using it and the issues they have. For 
example, I only just discovered this discussion about it!

I can also personally apologise. I get bombarded with things from many 
directions, and sometimes things fall through the cracks. If you can keep 
on-top-of your issues and remind me, then I will try and help where I can.

{quote}
They leave out critical functionality, features, and configuration options that 
have been in the c++ API for a very long time
{quote}

The Java API tries to mirror the C++ API, and for the most part we have most of 
the functionality that users expect. The C++ API changes very rapidly, with 
things constantly being added, removed, re-added and modified. Some of these 
things are "experimental". In some ways, it is good that the Java API is a 
little behind the C++ API for stability. We try not to miss critical 
functionality though, and when this is raised as an issue we try to address it 
quickly. It is also worth adding that some things don't make sense to port into 
the Java API, this might be due to performance and language constraints. If you 
feel that there are critical things missing please open an issue for each one 
(https://github.com/facebook/rocksdb/issues) and mention `@adamretter`.

{quote}
Those that do make it over often have random gaps in the API such as setters 
but no getters (see [rocksdb PR 
#5186|https://github.com/facebook/rocksdb/pull/5186])
{quote}
This was a contribution from an external developer, and it was deemed that it 
improved the current situation even if it was not perfect. Please feel free to 
send PRs yourselves that also improve the situation further.


{quote}
Others are poorly designed and require too many trips across the JNI, making 
otherwise incredibly useful features prohibitively expensive.
{quote}

Crossing the JNI boundary is expensive. That is a fact that we cannot ourselves 
change, this is down to those that specify the JVM and JNI fundamentals and 
also the JVM implementers. We can of course try to design our APIs to 
ameliorate these where possible. There is currently ongoing work in this area, 
some of which will be merged before the new year.

{quote}
Custom comparator]: a custom comparator could significantly improve the 
performance of session windows. This is trivial to do but given the high 
performance cost of crossing the jni, it is currently only practical to use a 
c++ comparator
{quote}

This is really a fundamental JNI restriction. Things like Comparator and Merge 
Operator are callback based C++ APIs, and crossing that boundary into Java on 
every callback is expensive. We have some improvements to Java Comparators 
which will come in hopefully before the new year to remove the mutex around 
Slice reuse.

For you it would be sensible to implement your performance critical comparators 
in C++ and ship them as a native library. I believe there is some work 
in-progress for RocksDB to gain a mechanism for a module system to allow 
loading C++ comparators and merge operators dynamically. We could potentially 
expose that from the Java API to allow you to load your native comparators.

{quote}
[Prefix Seek|https://github.com/facebook/rocksdb/issues/6004]: not currently 
used by Streams but a commonly requested 

[jira] [Updated] (KAFKA-9297) CreateTopic API do not work with older version of the request/response

2019-12-16 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-9297:
---
Description: The create topic api do not work with older version of the 
api. It can be reproduced by trying to create a topic with `kafka-topics.sh` 
from 2.3. It timeouts.  (was: The create topic api do not work with older 
version of the api. It can be reproduced by trying to create a topic with 
`kafka-topics.sh` from 2.3. It timeouts.

The latest version of the response has introduced new fields with default 
values. When those fields are not supported by the version used by the client, 
the serialization mechanism expect to have the default values and throws 
otherwise. The current implementation in KafkaApis set them regardless of the 
version used.)

> CreateTopic API do not work with older version of the request/response
> --
>
> Key: KAFKA-9297
> URL: https://issues.apache.org/jira/browse/KAFKA-9297
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>
> The create topic api do not work with older version of the api. It can be 
> reproduced by trying to create a topic with `kafka-topics.sh` from 2.3. It 
> timeouts.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8855) Collect and Expose Client's Name and Version in the Brokers

2019-12-16 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-8855.

Fix Version/s: 2.5.0
   2.4.0
   Resolution: Fixed

The protocol part landed in 2.4.0 and the metric part landed in 2.5.0.

> Collect and Expose Client's Name and Version in the Brokers
> ---
>
> Key: KAFKA-8855
> URL: https://issues.apache.org/jira/browse/KAFKA-8855
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
> Fix For: 2.4.0, 2.5.0
>
>
> Implements KIP-511 as documented here: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-511%3A+Collect+and+Expose+Client%27s+Name+and+Version+in+the+Brokers]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9300) Create a topic based on the specified brokers

2019-12-16 Thread huxihx (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9300?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16997081#comment-16997081
 ] 

huxihx commented on KAFKA-9300:
---

This might need a KIP.

> Create a topic based on the specified brokers
> -
>
> Key: KAFKA-9300
> URL: https://issues.apache.org/jira/browse/KAFKA-9300
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients
>Affects Versions: 2.2.2
>Reporter: weiwei
>Assignee: weiwei
>Priority: Major
> Fix For: 2.4.0
>
>   Original Estimate: 120h
>  Remaining Estimate: 120h
>
> Generally, A Kafka cluster serves multiple businesses. To reduce the impact 
> of businesses, many companies isolate brokers to physically isolate 
> businesses. That is, the topics of certain businesses are created on the 
> specified brokers. The current topic creation script supports only create 
> topic according replica-assignment . This function is not convenient for the 
> service to specify the brokers. Therefore, you need to add this function as 
> follows: Create a topci based on the specified brokers. The 
> replica-assignment-brokers parameter is added to indicate the broker range of 
> the topic distribution. If this parameter is not set, all broker nodes in the 
> cluster are used. For example, kafka-topics.sh --create --topic test06 
> --partitions 2 --replication-factor 1 --zookeeper zkurl -- 
> --replica-assignment-brokers=1,2.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)