[jira] [Commented] (KAFKA-8838) Allow consumer group tool to work with non-existing consumer groups

2019-09-05 Thread huxihx (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8838?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923917#comment-16923917
 ] 

huxihx commented on KAFKA-8838:
---

Do you use the latest version? I cannot reproduce this problem using 2.3. 

> Allow consumer group tool to work with non-existing consumer groups
> ---
>
> Key: KAFKA-8838
> URL: https://issues.apache.org/jira/browse/KAFKA-8838
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Patrik Kleindl
>Priority: Minor
>
> The streams application reset tool works for non-existing consumer groups and 
> allows to "pre-set" offsets before a new deployment.
> The consumer group tool does not allow the same which would be a nice 
> enhancement.
> If this should work and the NullPointerException is not expected this can be 
> converted to a bug.
>  
> {code:java}
> ./kafka-consumer-groups --bootstrap-server broker:9092 --group applicationId 
> --reset-offsets --by-duration P60D --topic topic1 --executeError: Executing 
> consumer group command failed due to nulljava.lang.NullPointerException at 
> scala.collection.convert.Wrappers$JListWrapper.iterator(Wrappers.scala:88) at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
> scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$$anonfun$parseTopicPartitionsToReset$1.apply(ConsumerGroupCommand.scala:477)
>  at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService$$anonfun$parseTopicPartitionsToReset$1.apply(ConsumerGroupCommand.scala:471)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891) at 
> scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at 
> scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at 
> scala.collection.AbstractIterable.foreach(Iterable.scala:54) at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at 
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService.parseTopicPartitionsToReset(ConsumerGroupCommand.scala:471)
>  at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService.getPartitionsToReset(ConsumerGroupCommand.scala:486)
>  at 
> kafka.admin.ConsumerGroupCommand$ConsumerGroupService.resetOffsets(ConsumerGroupCommand.scala:310)
>  at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:64) at 
> kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala){code}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8818) CreatePartitions Request protocol documentation

2019-09-05 Thread huxihx (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923907#comment-16923907
 ] 

huxihx commented on KAFKA-8818:
---

Why do you think the array here is invalid? It stores the replica assignment 
for a new partition. If the replication factor is larger than 1, then 
assignment must be an array containing all the brokers on which replicas are 
running.

> CreatePartitions Request protocol documentation
> ---
>
> Key: KAFKA-8818
> URL: https://issues.apache.org/jira/browse/KAFKA-8818
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Reporter: Fábio Silva
>Priority: Major
>  Labels: documentation, protocol-documentation
>
> CreatePartitions Request protocol documentation contains a invalid type 
> ARRAY(INT32) (assignment field), it must be INT32.
> Wrong: 
> {code:java}
> assignment => ARRAY(INT32){code}
> Correct:
> {code:java}
> assignment => INT32
> {code}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8813) Race condition when creating topics and changing their configuration

2019-09-05 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923832#comment-16923832
 ] 

ASF GitHub Bot commented on KAFKA-8813:
---

soondenana commented on pull request #7305: KAFKA-8813: Refresh log config if 
it's updated before initializaiton
URL: https://github.com/apache/kafka/pull/7305
 
 
   A partition log in initialized in following steps:
   
   1. Fetch log config from ZK
   2. Call LogManager.getOrCreateLog which creates the Log object, then
   3. Registers the Log object
   
   Step #3 enables Configuration update thread to deliver configuration
   updates to the log. But if any update arrives between step #1 and #3
   then that update is missed. It breaks following use case:
   
   1. Create a topic with default configuration, and immediately after that
   2. Update the configuration of topic
   
   There is a race condition here and in random cases update made in
   seocond step will get dropped.
   
   This change fixes it by tracking updates arriving between step #1 and #3
   Once a Partition is done initialzing log, it checks if it has missed any
   update. If yes, then the configuration is read from ZK again.
   
   Added unit tests to make sure a dirty configuration is refreshed. Tested
   on local cluster to make sure that topic configuration and updates are
   handled correctly.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Race condition when creating topics and changing their configuration
> 
>
> Key: KAFKA-8813
> URL: https://issues.apache.org/jira/browse/KAFKA-8813
> Project: Kafka
>  Issue Type: Bug
>Reporter: Gwen Shapira
>Assignee: Vikas Singh
>Priority: Major
>
> In Partition.createLog we do:
> {code:java}
> val props = stateStore.fetchTopicConfig()
> val config = LogConfig.fromProps(logManager.currentDefaultConfig.originals, 
> props)
> val log = logManager.getOrCreateLog(topicPartition, config, isNew, 
> isFutureReplica)
> {code}
> [https://github.com/apache/kafka/blob/33d06082117d971cdcddd4f01392006b543f3c01/core/src/main/scala/kafka/cluster/Partition.scala#L314-L316|https://github.com/apache/kafka/blob/33d06082117d971cdcddd4f01392006b543f3c01/core/src/main/scala/kafka/cluster/Partition.scala#L314-L316]
> Config changes that arrive after configs are loaded from ZK, but before 
> LogManager added the partition to `futureLogs` or `currentLogs` where the 
> dynamic config handlers picks up topics to update their configs, will be lost.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8574) EOS race condition during task transition leads to LocalStateStore truncation in Kafka Streams 2.0.1

2019-09-05 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8574?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923831#comment-16923831
 ] 

Guozhang Wang commented on KAFKA-8574:
--

Sorry for being late on this ticket.

I've read through the code and I agree with [~wgreerx]'s analysis that this is 
indeed a bug. As for the fix, I think this PR which is merged in 2.3.0 / 2.2.1 
should have resolved it since it now writes a checkpoint file upon suspended 
with EOS turned on still, and hence we tackle this issue by fixing the second 
step: "Thread-A is suspended on 1. This does not write a checkpoint file 
because EOS is enabled [1]".

https://github.com/apache/kafka/commit/1f9aa01a5b3b59d90499a059d719af03483d5130

> EOS race condition during task transition leads to LocalStateStore truncation 
> in Kafka Streams 2.0.1
> 
>
> Key: KAFKA-8574
> URL: https://issues.apache.org/jira/browse/KAFKA-8574
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: William Greer
>Priority: Major
>
> *Overview*
>  While using EOS in Kafka Stream there is a race condition where the 
> checkpoint file is written by the previous owning thread (Thread A) after the 
> new owning thread (Thread B) reads the checkpoint file. Thread B then starts 
> a restoration since no checkpoint file was found. A re-balance occurs before 
> Thread B completes the restoration and a third Thread (Thread C) becomes the 
> owning thread (Thread C) reads the checkpoint file written by Thread A which 
> does not correspond to the current state of the RocksDB state store. When 
> this race condition occurs the state store will have the most recent records 
> and some amount of the oldest records but will be missing some amount of 
> records in between. If A->Z represents the entire changelog to the present 
> then when this scenario occurs the state store would contain records [A->K 
> and Y->Z] where the state store is missing records K->Y.
>   
>  This race condition is possible due to dirty writes and dirty reads of the 
> checkpoint file.
>   
>  *Example:*
>  Thread refers to a Kafka Streams StreamThread [0]
>  Thread A, B and C are running in the same JVM in the same streams 
> application.
>   
>  Scenario:
>  Thread-A is in RUNNING state and up to date on partition 1.
>  Thread-A is suspended on 1. This does not write a checkpoint file because 
> EOS is enabled [1]
>  Thread-B is assigned to 1
>  Thread-B does not find checkpoint in StateManager [2]
>  Thread-A is assigned a different partition. Task writes suspended tasks 
> checkpoints to disk. Checkpoint for 1 is written. [3]
>  Thread-B deletes LocalStore and starts restoring. The deletion of the 
> LocalStore does not delete checkpoint file. [4]
>  Thread-C is revoked
>  Thread-A is revoked
>  Thread-B is revoked from the assigned status. Does not write a checkpoint 
> file
>  - Note Thread-B never reaches the running state, it remains in the 
> PARTITIONS_ASSIGNED state until it transitions to the PARTITIONS_REVOKED state
> Thread-C is assigned 1
>  Thread-C finds checkpoint in StateManager. This checkpoint corresponds to 
> where Thread-A left the state store for partition 1 at and not where Thread-B 
> left the state store at.
>  Thread-C begins restoring from checkpoint. The state store is missing an 
> unknown number of records at this point
>  Thread-B is assigned does not write a checkpoint file for partition 1, 
> because it had not reached a running status before being revoked
>   
>  [0] 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java]
>  [1] 
> [https://github.com/apache/kafka/blob/2.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L522-L553]
>  [2] 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L98]
>  [3] 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L104-L105]
>  & 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java#L316-L331]
>  [4] 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java#L228]
>  & 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java#L62-L123]
>  Specifically 
> [https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java#L107-L119]
>  is where the state store is 

[jira] [Commented] (KAFKA-8880) Augment Consumer.committed(partition) to allow multiple partitions

2019-09-05 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923822#comment-16923822
 ] 

ASF GitHub Bot commented on KAFKA-8880:
---

guozhangwang commented on pull request #7304: KAFKA-8880: Add overloaded 
function of Consumer.committed [WIP]
URL: https://github.com/apache/kafka/pull/7304
 
 
   KIP pending.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Augment Consumer.committed(partition) to allow multiple partitions
> --
>
> Key: KAFKA-8880
> URL: https://issues.apache.org/jira/browse/KAFKA-8880
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip, newbie++
>
> We've observed that many usage of the consumer.committed calls are made for 
> not only one partition, but for a batch of partitions. On the other hand, the 
> OffsetFetchRequest protocol actually allows for multiple partitions within 
> one request.
> I'd propose we add an overloaded function of KafkaConsumer that takes 
> {code}
> Map committed(Collection 
> partitions, final Duration timeout)
> {code}
> And then deprecate the existing function that only takes on partition.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (KAFKA-7895) Ktable supress operator emitting more than one record for the same key per window

2019-09-05 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7895:
---
Fix Version/s: 2.3.0

> Ktable supress operator emitting more than one record for the same key per 
> window
> -
>
> Key: KAFKA-7895
> URL: https://issues.apache.org/jira/browse/KAFKA-7895
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.2.0, 2.1.1
>Reporter: prasanthi
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.3.0, 2.1.2, 2.2.1
>
>
> Hi, We are using kstreams to get the aggregated counts per vendor(key) within 
> a specified window.
> Here's how we configured the suppress operator to emit one final record per 
> key/window.
> {code:java}
> KTable, Long> windowedCount = groupedStream
>  .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(ofMillis(5L)))
>  .count(Materialized.with(Serdes.Integer(),Serdes.Long()))
>  .suppress(Suppressed.untilWindowCloses(unbounded()));
> {code}
> But we are getting more than one record for the same key/window as shown 
> below.
> {code:java}
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1039
> [KTABLE-TOSTREAM-10]: [131@154906704/154906710], 1162
> [KTABLE-TOSTREAM-10]: [9@154906704/154906710], 6584
> [KTABLE-TOSTREAM-10]: [88@154906704/154906710], 107
> [KTABLE-TOSTREAM-10]: [108@154906704/154906710], 315
> [KTABLE-TOSTREAM-10]: [119@154906704/154906710], 119
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 746
> [KTABLE-TOSTREAM-10]: [154@154906704/154906710], 809{code}
> Could you please take a look?
> Thanks
>  
>  
> Added by John:
> Acceptance Criteria:
>  * add suppress to system tests, such that it's exercised with crash/shutdown 
> recovery, rebalance, etc.
>  ** [https://github.com/apache/kafka/pull/6278]
>  * make sure that there's some system test coverage with caching disabled.
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7943
>  * test with tighter time bounds with windows of say 30 seconds and use 
> system time without adding any extra time for verification
>  ** Follow-on ticket: https://issues.apache.org/jira/browse/KAFKA-7944



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Resolved] (KAFKA-4729) Stores for kstream-kstream join cannot be in-memory

2019-09-05 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-4729.

Resolution: Duplicate

> Stores for kstream-kstream join cannot be in-memory
> ---
>
> Key: KAFKA-4729
> URL: https://issues.apache.org/jira/browse/KAFKA-4729
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Eno Thereska
>Priority: Major
>
> Whereas we can specify in the DSL that stores used for aggregates can be 
> RocksDb-based on in-memory, we cannot do that for stores used for 
> KStream-KStream joins. E.g., the join() methon in KStreamImpl.java creates 
> two state stores and the user does not have the option of having them be 
> in-memory:
> StateStoreSupplier thisWindow =
> createWindowedStateStore(windows, keySerde, lhsValueSerde, 
> joinThisName + "-store");
> StateStoreSupplier otherWindow =
> createWindowedStateStore(windows, keySerde, otherValueSerde, 
> joinOtherName + "-store");
> Part of the problem is that for joins, stores are not exposed to the user. We 
> might want to rethink that.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (KAFKA-8880) Augment Consumer.committed(partition) to allow multiple partitions

2019-09-05 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-8880:


 Summary: Augment Consumer.committed(partition) to allow multiple 
partitions
 Key: KAFKA-8880
 URL: https://issues.apache.org/jira/browse/KAFKA-8880
 Project: Kafka
  Issue Type: Improvement
  Components: consumer
Reporter: Guozhang Wang


We've observed that many usage of the consumer.committed calls are made for not 
only one partition, but for a batch of partitions. On the other hand, the 
OffsetFetchRequest protocol actually allows for multiple partitions within one 
request.

I'd propose we add an overloaded function of KafkaConsumer that takes 

{code}
Map committed(Collection 
partitions, final Duration timeout)
{code}

And then deprecate the existing function that only takes on partition.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (KAFKA-1534) transient unit test failure in testBasicPreferredReplicaElection

2019-09-05 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-1534:
---
Labels: flaky-test newbie  (was: newbie transient-unit-test-failure)

> transient unit test failure in testBasicPreferredReplicaElection
> 
>
> Key: KAFKA-1534
> URL: https://issues.apache.org/jira/browse/KAFKA-1534
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core, unit tests
>Affects Versions: 0.8.2.0, 2.4.0
>Reporter: Jun Rao
>Priority: Major
>  Labels: flaky-test, newbie
>
> Saw the following transient failure. 
> kafka.admin.AdminTest > testBasicPreferredReplicaElection FAILED
> junit.framework.AssertionFailedError: Timing out after 5000 ms since 
> leader is not elected or changed for partition [test,1]
> at junit.framework.Assert.fail(Assert.java:47)
> at 
> kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:542)
> at 
> kafka.admin.AdminTest.testBasicPreferredReplicaElection(AdminTest.scala:310)



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (KAFKA-1534) transient unit test failure in testBasicPreferredReplicaElection

2019-09-05 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-1534:
---
Parent: (was: KAFKA-2054)
Issue Type: Bug  (was: Sub-task)

> transient unit test failure in testBasicPreferredReplicaElection
> 
>
> Key: KAFKA-1534
> URL: https://issues.apache.org/jira/browse/KAFKA-1534
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 0.8.2.0, 2.4.0
>Reporter: Jun Rao
>Priority: Major
>  Labels: flaky-test, newbie
>
> Saw the following transient failure. 
> kafka.admin.AdminTest > testBasicPreferredReplicaElection FAILED
> junit.framework.AssertionFailedError: Timing out after 5000 ms since 
> leader is not elected or changed for partition [test,1]
> at junit.framework.Assert.fail(Assert.java:47)
> at 
> kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:542)
> at 
> kafka.admin.AdminTest.testBasicPreferredReplicaElection(AdminTest.scala:310)



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (KAFKA-1534) transient unit test failure in testBasicPreferredReplicaElection

2019-09-05 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-1534:
---
Component/s: unit tests

> transient unit test failure in testBasicPreferredReplicaElection
> 
>
> Key: KAFKA-1534
> URL: https://issues.apache.org/jira/browse/KAFKA-1534
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core, unit tests
>Affects Versions: 0.8.2.0, 2.4.0
>Reporter: Jun Rao
>Priority: Major
>  Labels: newbie, transient-unit-test-failure
>
> Saw the following transient failure. 
> kafka.admin.AdminTest > testBasicPreferredReplicaElection FAILED
> junit.framework.AssertionFailedError: Timing out after 5000 ms since 
> leader is not elected or changed for partition [test,1]
> at junit.framework.Assert.fail(Assert.java:47)
> at 
> kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:542)
> at 
> kafka.admin.AdminTest.testBasicPreferredReplicaElection(AdminTest.scala:310)



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Reopened] (KAFKA-1534) transient unit test failure in testBasicPreferredReplicaElection

2019-09-05 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reopened KAFKA-1534:

  Assignee: (was: Abhishek Sharma)

Failed again: 
[https://builds.apache.org/blue/organizations/jenkins/kafka-2.3-jdk8/detail/kafka-2.3-jdk8/101/pipeline]

> transient unit test failure in testBasicPreferredReplicaElection
> 
>
> Key: KAFKA-1534
> URL: https://issues.apache.org/jira/browse/KAFKA-1534
> Project: Kafka
>  Issue Type: Sub-task
>  Components: core
>Affects Versions: 0.8.2.0
>Reporter: Jun Rao
>Priority: Major
>  Labels: newbie, transient-unit-test-failure
>
> Saw the following transient failure. 
> kafka.admin.AdminTest > testBasicPreferredReplicaElection FAILED
> junit.framework.AssertionFailedError: Timing out after 5000 ms since 
> leader is not elected or changed for partition [test,1]
> at junit.framework.Assert.fail(Assert.java:47)
> at 
> kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:542)
> at 
> kafka.admin.AdminTest.testBasicPreferredReplicaElection(AdminTest.scala:310)



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (KAFKA-6460) Add mocks for state stores used in Streams unit testing

2019-09-05 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6460:
---
Labels: kip newbie++  (was: newbie++)

> Add mocks for state stores used in Streams unit testing
> ---
>
> Key: KAFKA-6460
> URL: https://issues.apache.org/jira/browse/KAFKA-6460
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Guozhang Wang
>Assignee: Yishun Guan
>Priority: Major
>  Labels: kip, newbie++
>
> We'd like to use mocks for different types of state stores: kv, window, 
> session that can be used to record the number of expected put / get calls 
> used in the DSL operator unit testing. This involves implementing the two 
> interfaces {{StoreSupplier}} and {{StoreBuilder}} that can return a object 
> created from, say, EasyMock, and the object can then be set up with the 
> expected calls.
> In addition, we should also add a mock record collector which can be returned 
> from the mock processor context so that with logging enabled store, users can 
> also validate if the changes have been forwarded to the changelog as well.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (KAFKA-6460) Add mocks for state stores used in Streams unit testing

2019-09-05 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6460?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-6460:
---
Component/s: (was: unit tests)
 (was: streams)
 streams-test-utils

> Add mocks for state stores used in Streams unit testing
> ---
>
> Key: KAFKA-6460
> URL: https://issues.apache.org/jira/browse/KAFKA-6460
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams-test-utils
>Reporter: Guozhang Wang
>Assignee: Yishun Guan
>Priority: Major
>  Labels: kip, newbie++
>
> We'd like to use mocks for different types of state stores: kv, window, 
> session that can be used to record the number of expected put / get calls 
> used in the DSL operator unit testing. This involves implementing the two 
> interfaces {{StoreSupplier}} and {{StoreBuilder}} that can return a object 
> created from, say, EasyMock, and the object can then be set up with the 
> expected calls.
> In addition, we should also add a mock record collector which can be returned 
> from the mock processor context so that with logging enabled store, users can 
> also validate if the changes have been forwarded to the changelog as well.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-7499) Extend ProductionExceptionHandler to cover serialization exceptions

2019-09-05 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923794#comment-16923794
 ] 

Matthias J. Sax commented on KAFKA-7499:


[~Chilio] – thanks for you interest. I added you to the list of contributors 
and assigned the ticket to you. You can now also self-assign tickets.

To push this over the finish line, we will first need to finish the KIP and get 
it accepted. I would recommend that you read the current KIP, and the full 
mailing list discussion to get the context. There are still a few open question 
that we need to answer before we can VOTE the KIP.

Let me know if you have any questions.

> Extend ProductionExceptionHandler to cover serialization exceptions
> ---
>
> Key: KAFKA-7499
> URL: https://issues.apache.org/jira/browse/KAFKA-7499
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Alaa Zbair
>Priority: Major
>  Labels: beginner, kip, newbie
>
> In 
> [KIP-210|https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce],
>  an exception handler for the write path was introduced. This exception 
> handler covers exception that are raised in the producer callback.
> However, serialization happens before the data is handed to the producer with 
> Kafka Streams itself and the producer uses `byte[]/byte[]` key-value-pair 
> types.
> Thus, we might want to extend the ProductionExceptionHandler to cover 
> serialization exception, too, to skip over corrupted output messages. An 
> example could be a "String" message that contains invalid JSON and should be 
> serialized as JSON.
> KIP-399 (not voted yet; feel free to pick it up): 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-399%3A+Extend+ProductionExceptionHandler+to+cover+serialization+exceptions]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Assigned] (KAFKA-7499) Extend ProductionExceptionHandler to cover serialization exceptions

2019-09-05 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-7499:
--

Assignee: Alaa Zbair

> Extend ProductionExceptionHandler to cover serialization exceptions
> ---
>
> Key: KAFKA-7499
> URL: https://issues.apache.org/jira/browse/KAFKA-7499
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Alaa Zbair
>Priority: Major
>  Labels: beginner, kip, newbie
>
> In 
> [KIP-210|https://cwiki.apache.org/confluence/display/KAFKA/KIP-210+-+Provide+for+custom+error+handling++when+Kafka+Streams+fails+to+produce],
>  an exception handler for the write path was introduced. This exception 
> handler covers exception that are raised in the producer callback.
> However, serialization happens before the data is handed to the producer with 
> Kafka Streams itself and the producer uses `byte[]/byte[]` key-value-pair 
> types.
> Thus, we might want to extend the ProductionExceptionHandler to cover 
> serialization exception, too, to skip over corrupted output messages. An 
> example could be a "String" message that contains invalid JSON and should be 
> serialized as JSON.
> KIP-399 (not voted yet; feel free to pick it up): 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-399%3A+Extend+ProductionExceptionHandler+to+cover+serialization+exceptions]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Resolved] (KAFKA-8793) StickyTaskAssignor throws java.lang.ArithmeticException

2019-09-05 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8793?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-8793.
--
Resolution: Not A Problem

> StickyTaskAssignor throws java.lang.ArithmeticException
> ---
>
> Key: KAFKA-8793
> URL: https://issues.apache.org/jira/browse/KAFKA-8793
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Raman Gupta
>Assignee: Guozhang Wang
>Priority: Critical
>
> Occassionally when starting a streams consumer that uses the static consumer 
> group protocol, I get the following error:
> {code:java}
> 2019-08-13 06:06:43,527 ERROR --- [691d2-StreamThread-1] 
> org.apa.kaf.str.pro.int.StreamThread  : stream-thread 
> [prod-cisSegmenter-777489d8-6cc5-48b4-8771-868d873691d2-StreamThread-1] 
> Encountered the following er
> ror during processing:
> EXCEPTION: java.lang.ArithmeticException: / by zero
> at 
> org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor.assignActive(StickyTaskAssignor.java:76)
>  ~[kafka-streams-2.3.0.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor.assign(StickyTaskAssignor.java:52)
>  ~[kafka-streams-2.3.0.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:634)
>  ~[kafka-streams-2.3.0.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:424)
>  ~[kafka-clients-2.3.0.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:622)
>  ~[kafka-clients-2.3.0.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:107)
>  ~[kafka-clients-2.3.0.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:544)
>  ~[kafka-clients-2.3.0.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:527)
>  ~[kafka-clients-2.3.0.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:978)
>  ~[kafka-clients-2.3.0.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:958)
>  ~[kafka-clients-2.3.0.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
>  ~[kafka-clients-2.3.0.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
>  ~[kafka-clients-2.3.0.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
>  ~[kafka-clients-2.3.0.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:578)
>  ~[kafka-clients-2.3.0.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:388)
>  ~[kafka-clients-2.3.0.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
>  ~[kafka-clients-2.3.0.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
>  ~[kafka-clients-2.3.0.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
>  ~[kafka-clients-2.3.0.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:415)
>  ~[kafka-clients-2.3.0.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
>  ~[kafka-clients-2.3.0.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353)
>  ~[kafka-clients-2.3.0.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251)
>  ~[kafka-clients-2.3.0.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) 
> ~[kafka-clients-2.3.0.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) 
> ~[kafka-clients-2.3.0.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:941)
>  ~[kafka-streams-2.3.0.jar:?]
> at 
> 

[jira] [Resolved] (KAFKA-8879) GlobalStateUpdateTask uses wrong javaType to deserialize value

2019-09-05 Thread Vlad Olevsky (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vlad Olevsky resolved KAFKA-8879.
-
Resolution: Invalid

> GlobalStateUpdateTask uses wrong javaType to deserialize value
> --
>
> Key: KAFKA-8879
> URL: https://issues.apache.org/jira/browse/KAFKA-8879
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Vlad Olevsky
>Priority: Major
>
> We read messages from input topic, transform messages 
> (ChannelConfigNew->ChannelConfig) and send it to another topic:
>  
> {code:java}
> builder.stream(chlConfigValidationKafkaConfig.getChlConfigValidationKafkaSrcTopic(),
>  Consumed.with(Serdes.String(), new JsonSerde<>(ChannelConfigNew.class)))
>  .transform(() -> new ChannelConfigProcessor(chlConfigValidationKafkaConfig, 
> prometheusCounter, channelConfigPostDataHelper))
>  
> .to(chlConfigValidationKafkaConfig.getChlConfigValidationKafkaDestinationTopic(),
>  Produced.with(Serdes.String(), new JsonSerde<>(ChannelConfig.class)));
> {code}
> where ChannelConfigProcessor (only essential parts are shown)
>  
> {code:java}
> public class ChannelConfigProcessor implements Transformer ChannelConfigNew, KeyValue> {
>  public KeyValue transform(String ccid, 
> ChannelConfigNew channelConfigNew) {
>  return new KeyValue<>(ccid, convert(channelConfigNew));
>  }
>  
>  private ChannelConfig convert(ChannelConfigNew channelConfigNew){
>  ...
>  }
> }
>  
> {code}
> Both input (ChannelConfigNew ) and output(ChannelConfig) javaTypes are stored 
> in the headers of the message that is sent to another topic. Input javaType 
> (ChannelConfigNew) already presented in headers when serialization is called 
> in
> {code:java}
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl::send(final 
> String topic,
>  final K key,
>  final V value,
>  final Headers headers,
>  final Integer partition,
>  final Long timestamp,
>  final Serializer keySerializer,
>  final Serializer valueSerializer){
>    final byte[] valBytes = valueSerializer.serialize(topic, headers, value);
> }
>  
> {code}
> output javaType (ChannelConfig) is added to headers inside 
> valueSerializer.serialize() method:
> org.springframework.kafka.support.serializer.JsonSerializer
>  
> {code:java}
> @Override
>  public byte[] serialize(String topic, Headers headers, T data) {
>  if (data == null) {
>  return null;
>  }
>  if (this.addTypeInfo && headers != null) {
>  
> this.typeMapper.fromJavaType(this.objectMapper.constructType(data.getClass()),
>  headers);
>  }
>  return serialize(topic, data);
>  }
>  
> {code}
>  
> On other side we have GlobalKTable with processor that retrieves first 
> (ChannelConfigNew - wrong one) javaType from headers  and tries to 
> deserialize  data using this type:
>  
> {code:java}
> org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask
> public void update(final ConsumerRecord record) {
> ...
>  final ConsumerRecord deserialized = 
> sourceNodeAndDeserializer.deserialize(processorContext, record);
> ..
> }
> {code}
>  
>  
> sourceNodeAndDeserializer.deserialize calls
> {code:java}
> org.apache.kafka.streams.processor.internals.RecordDeserializer::deserialize()
> {
> sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), 
> rawRecord.value()), rawRecord.headers());
> }
>  
> eventually reaches
> org.springframework.kafka.support.serializer.JsonDeserializer::deserialize(String
>  topic, Headers headers, byte[] data){
>  if (data == null) {
>  return null;
>  }
>  JavaType javaType = this.typeMapper.toJavaType(headers);
>  if (javaType == null) {
>  Assert.state(this.targetType != null, "No type information in headers and no 
> default type provided");
>  return deserialize(topic, data);
>  }
>  else {
>  try {
>  return this.objectMapper.readerFor(javaType).readValue(data);
>  }
>  catch (IOException e) {
>  throw new SerializationException("Can't deserialize data [" + 
> Arrays.toString(data) +
>  "] from topic [" + topic + "]", e);
>  }
>  }
>  }
>  
> {code}
>  
> *JavaType javaType = this.typeMapper.toJavaType(headers)* extracts first 
> javaType -which is not one that should be used to deserialize the object (it 
> gets ChannelConfigNew  rather than ChannelConfig). As result the object is 
> not retrieved properly - all fields are null.
>  
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

2019-09-05 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923721#comment-16923721
 ] 

ASF GitHub Bot commented on KAFKA-7149:
---

guozhangwang commented on pull request #7185: KAFKA-7149 : Reducing streams 
assignment data size 
URL: https://github.com/apache/kafka/pull/7185
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce assignment data size to improve kafka streams scalability
> 
>
> Key: KAFKA-7149
> URL: https://issues.apache.org/jira/browse/KAFKA-7149
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ashish Surana
>Assignee: Vinoth Chandar
>Priority: Major
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below RecordTooLargeException at kafka-broker.
> Workaround of this issue is commented at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8878) Flaky Test AssignedStreamsTasksTest#shouldCloseCleanlyWithSuspendedTaskAndEOS

2019-09-05 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923713#comment-16923713
 ] 

ASF GitHub Bot commented on KAFKA-8878:
---

cpettitt-confluent commented on pull request #7302: KAFKA-8878: Fix flaky test 
AssignedStreamsTasksTest#shouldCloseCleanl…
URL: https://github.com/apache/kafka/pull/7302
 
 
   …yWithSuspendedTaskAndEOS
   
   The previous approach to testing KAFKA-8412 was to look at the logs and
   determine if an error occurred during close. There was no direct way to
   detect than an exception occurred because the exception was eaten in
   `AssignedTasks.close`. In the PR for that ticket (#7207) it was
   acknowledged that this was a brittle way to test for the exception. We
   now see occasional failures because an unrelated ERROR level log entry
   is made while closing the task.
   
   This change eliminates the brittle log checking by rethrowing any time
   an exception occurs in close, even when a subsequent unclean close
   succeeds. This has the potential benefit of uncovering other supressed
   exceptions down the road.
   
   I've verified that even with us rethrowing on `closeUnclean` that all
   tests pass.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Flaky Test AssignedStreamsTasksTest#shouldCloseCleanlyWithSuspendedTaskAndEOS
> -
>
> Key: KAFKA-8878
> URL: https://issues.apache.org/jira/browse/KAFKA-8878
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Chris Pettitt
>Priority: Major
>  Labels: flaky-test
>
> [https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk8/detail/kafka-trunk-jdk8/3887/tests]
> {quote}java.lang.AssertionError: Expected no ERROR message while closing 
> assignedTasks, but got 1. First error: [AdminClient clientId=adminclient-67] 
> Connection to node -1 (localhost/127.0.0.1:8080) could not be established. 
> Broker may not be available.. Cause: N/A
> at org.junit.Assert.fail(Assert.java:89)
> at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasksTest.shouldCloseCleanlyWithSuspendedTaskAndEOS(AssignedStreamsTasksTest.java:555){quote}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (KAFKA-8879) GlobalStateUpdateTask uses wrong javaType to deserialize value

2019-09-05 Thread Vlad Olevsky (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8879?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vlad Olevsky updated KAFKA-8879:

Description: 
We read messages from input topic, transform messages 
(ChannelConfigNew->ChannelConfig) and send it to another topic:

 
{code:java}
builder.stream(chlConfigValidationKafkaConfig.getChlConfigValidationKafkaSrcTopic(),
 Consumed.with(Serdes.String(), new JsonSerde<>(ChannelConfigNew.class)))
 .transform(() -> new ChannelConfigProcessor(chlConfigValidationKafkaConfig, 
prometheusCounter, channelConfigPostDataHelper))
 
.to(chlConfigValidationKafkaConfig.getChlConfigValidationKafkaDestinationTopic(),
 Produced.with(Serdes.String(), new JsonSerde<>(ChannelConfig.class)));

{code}
where ChannelConfigProcessor (only essential parts are shown)

 
{code:java}
public class ChannelConfigProcessor implements Transformer> {
 public KeyValue transform(String ccid, ChannelConfigNew 
channelConfigNew) {
 return new KeyValue<>(ccid, convert(channelConfigNew));
 }
 
 private ChannelConfig convert(ChannelConfigNew channelConfigNew){
 ...
 }

}
 
{code}
Both input (ChannelConfigNew ) and output(ChannelConfig) javaTypes are stored 
in the headers of the message that is sent to another topic. Input javaType 
(ChannelConfigNew) already presented in headers when serialization is called in
{code:java}
org.apache.kafka.streams.processor.internals.RecordCollectorImpl::send(final 
String topic,
 final K key,
 final V value,
 final Headers headers,
 final Integer partition,
 final Long timestamp,
 final Serializer keySerializer,
 final Serializer valueSerializer){
   final byte[] valBytes = valueSerializer.serialize(topic, headers, value);
}
 
{code}
output javaType (ChannelConfig) is added to headers inside 
valueSerializer.serialize() method:

org.springframework.kafka.support.serializer.JsonSerializer

 
{code:java}
@Override
 public byte[] serialize(String topic, Headers headers, T data) {
 if (data == null) {
 return null;
 }
 if (this.addTypeInfo && headers != null) {
 this.typeMapper.fromJavaType(this.objectMapper.constructType(data.getClass()), 
headers);
 }
 return serialize(topic, data);
 }
 
{code}
 

On other side we have GlobalKTable with processor that retrieves first 
(ChannelConfigNew - wrong one) javaType from headers  and tries to deserialize  
data using this type:

 
{code:java}
org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask
public void update(final ConsumerRecord record) {
...
 final ConsumerRecord deserialized = 
sourceNodeAndDeserializer.deserialize(processorContext, record);
..
}
{code}
 

 

sourceNodeAndDeserializer.deserialize calls
{code:java}
org.apache.kafka.streams.processor.internals.RecordDeserializer::deserialize()
{
sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), 
rawRecord.value()), rawRecord.headers());
}
 
eventually reaches
org.springframework.kafka.support.serializer.JsonDeserializer::deserialize(String
 topic, Headers headers, byte[] data){
 if (data == null) {
 return null;
 }
 JavaType javaType = this.typeMapper.toJavaType(headers);
 if (javaType == null) {
 Assert.state(this.targetType != null, "No type information in headers and no 
default type provided");
 return deserialize(topic, data);
 }
 else {
 try {
 return this.objectMapper.readerFor(javaType).readValue(data);
 }
 catch (IOException e) {
 throw new SerializationException("Can't deserialize data [" + 
Arrays.toString(data) +
 "] from topic [" + topic + "]", e);
 }
 }
 }
 
{code}
 

*JavaType javaType = this.typeMapper.toJavaType(headers)* extracts first 
javaType -which is not one that should be used to deserialize the object (it 
gets ChannelConfigNew  rather than ChannelConfig). As result the object is not 
retrieved properly - all fields are null.

 

 

 

 

 

  was:
We read messages from input topic, transform messages 
(ChannelConfigNew->ChannelConfig) and send it to another topic:

 
{code:java}
builder.stream(chlConfigValidationKafkaConfig.getChlConfigValidationKafkaSrcTopic(),
 Consumed.with(Serdes.String(), new JsonSerde<>(ChannelConfigNew.class)))
 .transform(() -> new ChannelConfigProcessor(chlConfigValidationKafkaConfig, 
prometheusCounter, channelConfigPostDataHelper))
 
.to(chlConfigValidationKafkaConfig.getChlConfigValidationKafkaDestinationTopic(),
 Produced.with(Serdes.String(), new JsonSerde<>(ChannelConfig.class)));

{code}
where ChannelConfigProcessor (only essential part is shown)

 
{code:java}
public class ChannelConfigProcessor implements Transformer> {
 public KeyValue transform(String ccid, ChannelConfigNew 
channelConfigNew) {
 return new KeyValue<>(ccid, convert(channelConfigNew));
 }
 
 private ChannelConfig convert(ChannelConfigNew channelConfigNew){
 ...
 }

}
 
{code}
Both input (ChannelConfigNew ) and output(ChannelConfig) javaTypes are stored 
in the headers of the message sent to another topic. Input type already 
presented in headers when 

[jira] [Created] (KAFKA-8879) GlobalStateUpdateTask uses wrong javaType to deserialize value

2019-09-05 Thread Vlad Olevsky (Jira)
Vlad Olevsky created KAFKA-8879:
---

 Summary: GlobalStateUpdateTask uses wrong javaType to deserialize 
value
 Key: KAFKA-8879
 URL: https://issues.apache.org/jira/browse/KAFKA-8879
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.3.0
Reporter: Vlad Olevsky


We read messages from input topic, transform messages 
(ChannelConfigNew->ChannelConfig) and send it to another topic:

 
{code:java}
builder.stream(chlConfigValidationKafkaConfig.getChlConfigValidationKafkaSrcTopic(),
 Consumed.with(Serdes.String(), new JsonSerde<>(ChannelConfigNew.class)))
 .transform(() -> new ChannelConfigProcessor(chlConfigValidationKafkaConfig, 
prometheusCounter, channelConfigPostDataHelper))
 
.to(chlConfigValidationKafkaConfig.getChlConfigValidationKafkaDestinationTopic(),
 Produced.with(Serdes.String(), new JsonSerde<>(ChannelConfig.class)));

{code}
where ChannelConfigProcessor (only essential part is shown)

 
{code:java}
public class ChannelConfigProcessor implements Transformer> {
 public KeyValue transform(String ccid, ChannelConfigNew 
channelConfigNew) {
 return new KeyValue<>(ccid, convert(channelConfigNew));
 }
 
 private ChannelConfig convert(ChannelConfigNew channelConfigNew){
 ...
 }

}
 
{code}
Both input (ChannelConfigNew ) and output(ChannelConfig) javaTypes are stored 
in the headers of the message sent to another topic. Input type already 
presented in headers when serialization is called in
{code:java}
org.apache.kafka.streams.processor.internals.RecordCollectorImpl::send(final 
String topic,
 final K key,
 final V value,
 final Headers headers,
 final Integer partition,
 final Long timestamp,
 final Serializer keySerializer,
 final Serializer valueSerializer){
   final byte[] valBytes = valueSerializer.serialize(topic, headers, value);
}
 
{code}
output type is added to headers inside valueSerializer.serialize() method:

org.springframework.kafka.support.serializer.JsonSerializer

 
{code:java}
@Override
 public byte[] serialize(String topic, Headers headers, T data) {
 if (data == null) {
 return null;
 }
 if (this.addTypeInfo && headers != null) {
 this.typeMapper.fromJavaType(this.objectMapper.constructType(data.getClass()), 
headers);
 }
 return serialize(topic, data);
 }
 
{code}
 

On other side we have GlobalKTable with processor that retrieves first 
(ChannelConfigNew - wrong one) javaType from header  and try to deserialize  
data using this type:

 
{code:java}
org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask
public void update(final ConsumerRecord record) {
...
 final ConsumerRecord deserialized = 
sourceNodeAndDeserializer.deserialize(processorContext, record);
..
}
{code}
 

 

sourceNodeAndDeserializer.deserialize calls
{code:java}
org.apache.kafka.streams.processor.internals.RecordDeserializer::deserialize()
{
sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), 
rawRecord.value()), rawRecord.headers());
}
 
eventually reaches
org.springframework.kafka.support.serializer.JsonDeserializer::deserialize(String
 topic, Headers headers, byte[] data){
 if (data == null) {
 return null;
 }
 JavaType javaType = this.typeMapper.toJavaType(headers);
 if (javaType == null) {
 Assert.state(this.targetType != null, "No type information in headers and no 
default type provided");
 return deserialize(topic, data);
 }
 else {
 try {
 return this.objectMapper.readerFor(javaType).readValue(data);
 }
 catch (IOException e) {
 throw new SerializationException("Can't deserialize data [" + 
Arrays.toString(data) +
 "] from topic [" + topic + "]", e);
 }
 }
 }
 
{code}
 

JavaType javaType = this.typeMapper.toJavaType(headers) calls extract first 
javaType -which is not one that should be used to deserialize the object (it 
gets ChannelConfigNew  rather than ChannelConfig). As result the object is not 
retrieved properly - all fields are null.

 

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Assigned] (KAFKA-8853) Create sustained connections test for Trogdor

2019-09-05 Thread Scott Hendricks (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8853?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Scott Hendricks reassigned KAFKA-8853:
--

Assignee: Scott Hendricks

> Create sustained connections test for Trogdor
> -
>
> Key: KAFKA-8853
> URL: https://issues.apache.org/jira/browse/KAFKA-8853
> Project: Kafka
>  Issue Type: Improvement
>  Components: system tests
>Reporter: Scott Hendricks
>Assignee: Scott Hendricks
>Priority: Major
>
> There are currently tests to run a high amount of connects and disconnects, 
> but there are no tests that create and maintain connections to bring Kafka to 
> its limit.
> My plan is to write a test that will take a desired number of clients 
> (KafkaConsumer, KafkaProducer, and AdminClient), the keep-alive rate for 
> these connections, and the number of threads desired to maintain these 
> connections.
> Each worker will spawn the desired number of threads that will find 
> connections that need to be maintained and act on them.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-6461) TableTableJoinIntegrationTest is unstable if caching is enabled

2019-09-05 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923636#comment-16923636
 ] 

Boyang Chen commented on KAFKA-6461:


[https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/24606/console]

> TableTableJoinIntegrationTest is unstable if caching is enabled
> ---
>
> Key: KAFKA-6461
> URL: https://issues.apache.org/jira/browse/KAFKA-6461
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: flaky-test
> Fix For: 1.1.0
>
>
> {noformat}
> org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
> testLeftInner[caching enabled = true] FAILED
> 20:41:05 java.lang.AssertionError: Condition not met within timeout 
> 15000. Never received expected final result.
> 20:41:05 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:276)
> 20:41:05 at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:254)
> 20:41:05 at 
> org.apache.kafka.streams.integration.AbstractJoinIntegrationTest.runTest(AbstractJoinIntegrationTest.java:248)
> 20:41:05 at 
> org.apache.kafka.streams.integration.TableTableJoinIntegrationTest.testLeftInner(TableTableJoinIntegrationTest.java:313){noformat}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8800) Flaky Test SaslScramSslEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl

2019-09-05 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923611#comment-16923611
 ] 

Matthias J. Sax commented on KAFKA-8800:


[https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk11/detail/kafka-trunk-jdk11/795/tests]

> Flaky Test 
> SaslScramSslEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
> --
>
> Key: KAFKA-8800
> URL: https://issues.apache.org/jira/browse/KAFKA-8800
> Project: Kafka
>  Issue Type: Bug
>  Components: core, security, unit tests
>Affects Versions: 2.4.0
>Reporter: Matthias J. Sax
>Assignee: Anastasia Vela
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.4.0, 2.3.1
>
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6956/testReport/junit/kafka.api/SaslScramSslEndToEndAuthorizationTest/testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl/]
> {quote}org.scalatest.exceptions.TestFailedException: Consumed 0 records 
> before timeout instead of the expected 1 records at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) 
> at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) 
> at org.scalatest.Assertions.fail(Assertions.scala:1091) at 
> org.scalatest.Assertions.fail$(Assertions.scala:1087) at 
> org.scalatest.Assertions$.fail(Assertions.scala:1389) at 
> kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:822) at 
> kafka.utils.TestUtils$.pollRecordsUntilTrue(TestUtils.scala:781) at 
> kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:1312) at 
> kafka.utils.TestUtils$.consumeRecords(TestUtils.scala:1320) at 
> kafka.api.EndToEndAuthorizationTest.consumeRecords(EndToEndAuthorizationTest.scala:522)
>  at 
> kafka.api.EndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(EndToEndAuthorizationTest.scala:361){quote}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8264) Flaky Test PlaintextConsumerTest#testLowMaxFetchSizeForRequestAndPartition

2019-09-05 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923608#comment-16923608
 ] 

Matthias J. Sax commented on KAFKA-8264:


[https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk11/detail/kafka-trunk-jdk11/795/tests]

> Flaky Test PlaintextConsumerTest#testLowMaxFetchSizeForRequestAndPartition
> --
>
> Key: KAFKA-8264
> URL: https://issues.apache.org/jira/browse/KAFKA-8264
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.0.1, 2.3.0
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.4.0
>
>
> [https://builds.apache.org/blue/organizations/jenkins/kafka-2.0-jdk8/detail/kafka-2.0-jdk8/252/tests]
> {quote}org.apache.kafka.common.errors.TopicExistsException: Topic 'topic3' 
> already exists.{quote}
> STDOUT
>  
> {quote}[2019-04-19 03:54:20,080] ERROR [ReplicaFetcher replicaId=1, 
> leaderId=0, fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:20,080] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:20,312] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:20,313] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition topic-1 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:20,994] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:21,727] ERROR [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Error for partition topicWithNewMessageFormat-1 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:28,696] ERROR [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:28,699] ERROR [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:29,246] ERROR [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Error for partition topic-1 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:29,247] ERROR [ReplicaFetcher replicaId=0, leaderId=1, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:29,287] ERROR [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Error for partition topic-1 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:33,408] ERROR [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:33,408] ERROR [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> 

[jira] [Commented] (KAFKA-8264) Flaky Test PlaintextConsumerTest#testLowMaxFetchSizeForRequestAndPartition

2019-09-05 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923604#comment-16923604
 ] 

Matthias J. Sax commented on KAFKA-8264:


[https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk11/detail/kafka-trunk-jdk11/794/tests]

> Flaky Test PlaintextConsumerTest#testLowMaxFetchSizeForRequestAndPartition
> --
>
> Key: KAFKA-8264
> URL: https://issues.apache.org/jira/browse/KAFKA-8264
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.0.1, 2.3.0
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.4.0
>
>
> [https://builds.apache.org/blue/organizations/jenkins/kafka-2.0-jdk8/detail/kafka-2.0-jdk8/252/tests]
> {quote}org.apache.kafka.common.errors.TopicExistsException: Topic 'topic3' 
> already exists.{quote}
> STDOUT
>  
> {quote}[2019-04-19 03:54:20,080] ERROR [ReplicaFetcher replicaId=1, 
> leaderId=0, fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:20,080] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:20,312] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:20,313] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition topic-1 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:20,994] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:21,727] ERROR [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Error for partition topicWithNewMessageFormat-1 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:28,696] ERROR [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:28,699] ERROR [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:29,246] ERROR [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Error for partition topic-1 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:29,247] ERROR [ReplicaFetcher replicaId=0, leaderId=1, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:29,287] ERROR [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Error for partition topic-1 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:33,408] ERROR [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:33,408] ERROR [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> 

[jira] [Assigned] (KAFKA-8878) Flaky Test AssignedStreamsTasksTest#shouldCloseCleanlyWithSuspendedTaskAndEOS

2019-09-05 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-8878:
--

Assignee: Chris Pettitt

> Flaky Test AssignedStreamsTasksTest#shouldCloseCleanlyWithSuspendedTaskAndEOS
> -
>
> Key: KAFKA-8878
> URL: https://issues.apache.org/jira/browse/KAFKA-8878
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Assignee: Chris Pettitt
>Priority: Major
>  Labels: flaky-test
>
> [https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk8/detail/kafka-trunk-jdk8/3887/tests]
> {quote}java.lang.AssertionError: Expected no ERROR message while closing 
> assignedTasks, but got 1. First error: [AdminClient clientId=adminclient-67] 
> Connection to node -1 (localhost/127.0.0.1:8080) could not be established. 
> Broker may not be available.. Cause: N/A
> at org.junit.Assert.fail(Assert.java:89)
> at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasksTest.shouldCloseCleanlyWithSuspendedTaskAndEOS(AssignedStreamsTasksTest.java:555){quote}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (KAFKA-8878) Flaky Test AssignedStreamsTasksTest#shouldCloseCleanlyWithSuspendedTaskAndEOS

2019-09-05 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-8878:
--

 Summary: Flaky Test 
AssignedStreamsTasksTest#shouldCloseCleanlyWithSuspendedTaskAndEOS
 Key: KAFKA-8878
 URL: https://issues.apache.org/jira/browse/KAFKA-8878
 Project: Kafka
  Issue Type: Bug
  Components: streams, unit tests
Reporter: Matthias J. Sax


[https://builds.apache.org/blue/organizations/jenkins/kafka-trunk-jdk8/detail/kafka-trunk-jdk8/3887/tests]
{quote}java.lang.AssertionError: Expected no ERROR message while closing 
assignedTasks, but got 1. First error: [AdminClient clientId=adminclient-67] 
Connection to node -1 (localhost/127.0.0.1:8080) could not be established. 
Broker may not be available.. Cause: N/A
at org.junit.Assert.fail(Assert.java:89)
at 
org.apache.kafka.streams.processor.internals.AssignedStreamsTasksTest.shouldCloseCleanlyWithSuspendedTaskAndEOS(AssignedStreamsTasksTest.java:555){quote}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Resolved] (KAFKA-8724) log cleaner thread dies when attempting to clean a __consumer_offsets partition after upgrade from 2.0->2.3

2019-09-05 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8724?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-8724.

Resolution: Fixed

> log cleaner thread dies when attempting to clean a __consumer_offsets 
> partition after upgrade from 2.0->2.3
> ---
>
> Key: KAFKA-8724
> URL: https://issues.apache.org/jira/browse/KAFKA-8724
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.3.0
> Environment: Linux 3.10.0-862.2.3.el7.x86_64 #1 SMP Wed May 9 
> 18:05:47 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux
>Reporter: Keith So
>Assignee: Jason Gustafson
>Priority: Critical
> Fix For: 2.3.1
>
> Attachments: KAFKA-308-stack-trace.txt
>
>
> We are attempting an upgrade from Kafka 2.0 to 2.3 on a single cluster setup. 
>  We have a mixture of Java/C++ and Python clients (Python clients are using 
> kafka-python libraries).
> After the upgrade, the log cleaner occasionally dies with the attached stack 
> trace.  Using timestamp correlation, we pinned it down to the cleaning of a 
> __consumer_offsets partition.  The config logged at initialization shows that 
> inter.broker.protocol.version = 2.3-IV1
> log.message.format.version = 2.3-IV1
> We initially thought this was to do with unclean upgrade from 2.0 to 2.3, but 
> after resetting the consumer offsets topic (via 
> [https://medium.com/@nblaye/reset-consumer-offsets-topic-in-kafka-with-zookeeper-5910213284a2])
>   this still recurs on initially empty consumer offset partitions.
> At the moment we are working around by toggling log.cleaner.threads option 
> using dynamic broker configuration to restore the log cleaner threads



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8724) log cleaner thread dies when attempting to clean a __consumer_offsets partition after upgrade from 2.0->2.3

2019-09-05 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8724?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923578#comment-16923578
 ] 

ASF GitHub Bot commented on KAFKA-8724:
---

hachikuji commented on pull request #7264: KAFKA-8724; Improve range checking 
when computing cleanable partitions
URL: https://github.com/apache/kafka/pull/7264
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> log cleaner thread dies when attempting to clean a __consumer_offsets 
> partition after upgrade from 2.0->2.3
> ---
>
> Key: KAFKA-8724
> URL: https://issues.apache.org/jira/browse/KAFKA-8724
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.3.0
> Environment: Linux 3.10.0-862.2.3.el7.x86_64 #1 SMP Wed May 9 
> 18:05:47 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux
>Reporter: Keith So
>Assignee: Jason Gustafson
>Priority: Critical
> Fix For: 2.3.1
>
> Attachments: KAFKA-308-stack-trace.txt
>
>
> We are attempting an upgrade from Kafka 2.0 to 2.3 on a single cluster setup. 
>  We have a mixture of Java/C++ and Python clients (Python clients are using 
> kafka-python libraries).
> After the upgrade, the log cleaner occasionally dies with the attached stack 
> trace.  Using timestamp correlation, we pinned it down to the cleaning of a 
> __consumer_offsets partition.  The config logged at initialization shows that 
> inter.broker.protocol.version = 2.3-IV1
> log.message.format.version = 2.3-IV1
> We initially thought this was to do with unclean upgrade from 2.0 to 2.3, but 
> after resetting the consumer offsets topic (via 
> [https://medium.com/@nblaye/reset-consumer-offsets-topic-in-kafka-with-zookeeper-5910213284a2])
>   this still recurs on initially empty consumer offset partitions.
> At the moment we are working around by toggling log.cleaner.threads option 
> using dynamic broker configuration to restore the log cleaner threads



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Comment Edited] (KAFKA-8877) Race condition on partition counter

2019-09-05 Thread Oleg Kuznetsov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923379#comment-16923379
 ] 

Oleg Kuznetsov edited comment on KAFKA-8877 at 9/5/19 12:36 PM:


[~huxi_2b]

Yes, you are right.

 

But looks like the same problem has migrated to *StickyPartitionCache*:

*indexCache* is better to be populated/changed atomically
{code:java}
public int nextPartition(String topic, Cluster cluster, int prevPartition) {
List partitions = cluster.partitionsForTopic(topic);
Integer oldPart = indexCache.get(topic);
Integer newPart = oldPart;
// Check that the current sticky partition for the topic is either not set 
or that the partition that 
// triggered the new batch matches the sticky partition that needs to be 
changed.
if (oldPart == null || oldPart == prevPartition) {
List availablePartitions = 
cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() < 1) {
Integer random = 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = random % partitions.size();
} else if (availablePartitions.size() == 1) {
newPart = availablePartitions.get(0).partition();
} else {
while (newPart == null || newPart.equals(oldPart)) {
Integer random = 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = availablePartitions.get(random % 
availablePartitions.size()).partition();
}
}
// Only change the sticky partition if it is null or prevPartition 
matches the current sticky partition.
if (oldPart == null) {
indexCache.putIfAbsent(topic, newPart);
} else {
indexCache.replace(topic, prevPartition, newPart);
}
return indexCache.get(topic);
}
return indexCache.get(topic);
}
{code}


was (Author: olkuznsmith):
[~huxi_2b]

Yes, you are right.

 

But looks like the same problem has migrated to *StickyPartitionCache*:

*indexCache* is ** better be populated/changed atomically
{code:java}
public int nextPartition(String topic, Cluster cluster, int prevPartition) {
List partitions = cluster.partitionsForTopic(topic);
Integer oldPart = indexCache.get(topic);
Integer newPart = oldPart;
// Check that the current sticky partition for the topic is either not set 
or that the partition that 
// triggered the new batch matches the sticky partition that needs to be 
changed.
if (oldPart == null || oldPart == prevPartition) {
List availablePartitions = 
cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() < 1) {
Integer random = 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = random % partitions.size();
} else if (availablePartitions.size() == 1) {
newPart = availablePartitions.get(0).partition();
} else {
while (newPart == null || newPart.equals(oldPart)) {
Integer random = 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = availablePartitions.get(random % 
availablePartitions.size()).partition();
}
}
// Only change the sticky partition if it is null or prevPartition 
matches the current sticky partition.
if (oldPart == null) {
indexCache.putIfAbsent(topic, newPart);
} else {
indexCache.replace(topic, prevPartition, newPart);
}
return indexCache.get(topic);
}
return indexCache.get(topic);
}
{code}

> Race condition on partition counter
> ---
>
> Key: KAFKA-8877
> URL: https://issues.apache.org/jira/browse/KAFKA-8877
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.2.1
>Reporter: Oleg Kuznetsov
>Priority: Major
>
> In the method:
> *org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue*
> {code:java}
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.get(topic);
> if (null == counter) {
> counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
> AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, 
> counter);
> if (currentCounter != null) {
> counter = currentCounter;
> }
> }
> return counter.getAndIncrement();
> }
> {code}
> the counter might be created multiple times instead of once.
> I propose to replace it with something like *topicCounterMap.compute(topic, _ 
> -> ...* (init the counter once per topic))  ** 
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8877) Race condition on partition counter

2019-09-05 Thread Oleg Kuznetsov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923379#comment-16923379
 ] 

Oleg Kuznetsov commented on KAFKA-8877:
---

[~huxi_2b]

Yes, you are right.

 

But looks like the same problem has migrated to *StickyPartitionCache*:

*indexCache* is ** better be populated/changed atomically
{code:java}
public int nextPartition(String topic, Cluster cluster, int prevPartition) {
List partitions = cluster.partitionsForTopic(topic);
Integer oldPart = indexCache.get(topic);
Integer newPart = oldPart;
// Check that the current sticky partition for the topic is either not set 
or that the partition that 
// triggered the new batch matches the sticky partition that needs to be 
changed.
if (oldPart == null || oldPart == prevPartition) {
List availablePartitions = 
cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() < 1) {
Integer random = 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = random % partitions.size();
} else if (availablePartitions.size() == 1) {
newPart = availablePartitions.get(0).partition();
} else {
while (newPart == null || newPart.equals(oldPart)) {
Integer random = 
Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = availablePartitions.get(random % 
availablePartitions.size()).partition();
}
}
// Only change the sticky partition if it is null or prevPartition 
matches the current sticky partition.
if (oldPart == null) {
indexCache.putIfAbsent(topic, newPart);
} else {
indexCache.replace(topic, prevPartition, newPart);
}
return indexCache.get(topic);
}
return indexCache.get(topic);
}
{code}

> Race condition on partition counter
> ---
>
> Key: KAFKA-8877
> URL: https://issues.apache.org/jira/browse/KAFKA-8877
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.2.1
>Reporter: Oleg Kuznetsov
>Priority: Major
>
> In the method:
> *org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue*
> {code:java}
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.get(topic);
> if (null == counter) {
> counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
> AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, 
> counter);
> if (currentCounter != null) {
> counter = currentCounter;
> }
> }
> return counter.getAndIncrement();
> }
> {code}
> the counter might be created multiple times instead of once.
> I propose to replace it with something like *topicCounterMap.compute(topic, _ 
> -> ...* (init the counter once per topic))  ** 
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-7994) Improve Stream-Time for rebalances and restarts

2019-09-05 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923359#comment-16923359
 ] 

Bruno Cadonna commented on KAFKA-7994:
--

I applied PR #6694 on the example I presented in my comment above. 
Unfortunately, it does not solve the issue, because the stream time handling is 
a bit intricate. Additionally to partition time, also a local processor time 
that depends on the partition time needs to be kept and made fail-safe to solve 
the issue. Although [~Yohan123]'s PR does not fix the issue, it is the 
important first step to get it fixed.

> Improve Stream-Time for rebalances and restarts
> ---
>
> Key: KAFKA-7994
> URL: https://issues.apache.org/jira/browse/KAFKA-7994
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Richard Yu
>Priority: Major
> Attachments: possible-patch.diff
>
>
> We compute a per-partition partition-time as the maximum timestamp over all 
> records processed so far. Furthermore, we use partition-time to compute 
> stream-time for each task as maximum over all partition-times (for all 
> corresponding task partitions). This stream-time is used to make decisions 
> about processing out-of-order records or drop them if they are late (ie, 
> timestamp < stream-time - grace-period).
> During rebalances and restarts, stream-time is initialized as UNKNOWN (ie, 
> -1) for tasks that are newly created (or migrated). In net effect, we forget 
> current stream-time for this case what may lead to non-deterministic behavior 
> if we stop processing right before a late record, that would be dropped if we 
> continue processing, but is not dropped after rebalance/restart. Let's look 
> at an examples with a grade period of 5ms for a tumbling windowed of 5ms, and 
> the following records (timestamps in parenthesis):
>  
> {code:java}
> r1(0) r2(5) r3(11) r4(2){code}
> In the example, stream-time advances as 0, 5, 11, 11  and thus record `r4` is 
> dropped as late because 2 < 6 = 11 - 5. However, if we stop processing or 
> rebalance after processing `r3` but before processing `r4`, we would 
> reinitialize stream-time as -1, and thus would process `r4` on restart/after 
> rebalance. The problem is, that stream-time does advance differently from a 
> global point of view: 0, 5, 11, 2.
> Note, this is a corner case, because if we would stop processing one record 
> earlier, ie, after processing `r2` but before processing `r3`, stream-time 
> would be advance correctly from a global point of view.
> A potential fix would be, to store latest observed partition-time in the 
> metadata of committed offsets. Thus way, on restart/rebalance we can 
> re-initialize time correctly.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8877) Race condition on partition counter

2019-09-05 Thread huxihx (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923313#comment-16923313
 ] 

huxihx commented on KAFKA-8877:
---

Seems `nextValue` is already removed by 
[KAFKA-8601|https://issues.apache.org/jira/browse/KAFKA-8601]. 

> Race condition on partition counter
> ---
>
> Key: KAFKA-8877
> URL: https://issues.apache.org/jira/browse/KAFKA-8877
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.2.1
>Reporter: Oleg Kuznetsov
>Priority: Major
>
> In the method:
> *org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue*
> {code:java}
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.get(topic);
> if (null == counter) {
> counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
> AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, 
> counter);
> if (currentCounter != null) {
> counter = currentCounter;
> }
> }
> return counter.getAndIncrement();
> }
> {code}
> the counter might be created multiple times instead of once.
> I propose to replace it with something like *topicCounterMap.compute(topic, _ 
> -> ...* (init the counter once per topic))  ** 
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8876) KafkaBasedLog does not throw exception when some partitions of the topic is offline

2019-09-05 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923242#comment-16923242
 ] 

ASF GitHub Bot commented on KAFKA-8876:
---

huxihx commented on pull request #7300: KAFKA-8876:KafkaBasedLog does not throw 
exception when some partition…
URL: https://github.com/apache/kafka/pull/7300
 
 
   …s of the topic is offline
   
   https://issues.apache.org/jira/browse/KAFKA-8876
   
   When starting up, KafkaBasedLog should throw ConnectException if any of the 
subscribed partitions has no leader.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KafkaBasedLog does not throw exception when some partitions of the topic is 
> offline
> ---
>
> Key: KAFKA-8876
> URL: https://issues.apache.org/jira/browse/KAFKA-8876
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: Boquan Tang
>Assignee: huxihx
>Priority: Major
>
> Currently KafkaBasedLog does not check if *all* partitions in the topic is 
> online or not, this may result it ignoring partitions that's still recovering 
> and in turn report to KafkaOffsetBackingStore null offset backed by the 
> concerning partition, while in fact it should either wait or fail the 
> connector thread to prompt retry, so the offset can be correctly loaded by 
> the connector.
> Specifically, we are using debezium mysql connector to replicate mysql binlog 
> to kafka.
> In an attempt of restarting after a cluster downage, we observed following:
> {code}
> 2019-08-29T19:27:32Z INFO 
> [org.apache.kafka.connect.storage.KafkaOffsetBackingStore] [main] Starting 
> KafkaOffsetBackingStore
> 2019-08-29T19:27:32Z INFO [org.apache.kafka.connect.util.KafkaBasedLog] 
> [main] Starting KafkaBasedLog with topic bobqueue-binlog-shovel-v1-offsets
> ...skipped client config logs...
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-12 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-10 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-21 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-5 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-20 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-18 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-2 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-13 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> 

[jira] [Assigned] (KAFKA-8876) KafkaBasedLog does not throw exception when some partitions of the topic is offline

2019-09-05 Thread huxihx (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

huxihx reassigned KAFKA-8876:
-

Assignee: huxihx

> KafkaBasedLog does not throw exception when some partitions of the topic is 
> offline
> ---
>
> Key: KAFKA-8876
> URL: https://issues.apache.org/jira/browse/KAFKA-8876
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: Boquan Tang
>Assignee: huxihx
>Priority: Major
>
> Currently KafkaBasedLog does not check if *all* partitions in the topic is 
> online or not, this may result it ignoring partitions that's still recovering 
> and in turn report to KafkaOffsetBackingStore null offset backed by the 
> concerning partition, while in fact it should either wait or fail the 
> connector thread to prompt retry, so the offset can be correctly loaded by 
> the connector.
> Specifically, we are using debezium mysql connector to replicate mysql binlog 
> to kafka.
> In an attempt of restarting after a cluster downage, we observed following:
> {code}
> 2019-08-29T19:27:32Z INFO 
> [org.apache.kafka.connect.storage.KafkaOffsetBackingStore] [main] Starting 
> KafkaOffsetBackingStore
> 2019-08-29T19:27:32Z INFO [org.apache.kafka.connect.util.KafkaBasedLog] 
> [main] Starting KafkaBasedLog with topic bobqueue-binlog-shovel-v1-offsets
> ...skipped client config logs...
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-12 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-10 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-21 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-5 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-20 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-18 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-2 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-13 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-11 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-8 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-23 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-7 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-22 to offset 0.
> 2019-08-29T19:27:33Z INFO 
> [org.apache.kafka.clients.consumer.internals.Fetcher] [main] [Consumer 
> clientId=consumer-1, groupId=bobqueue-binlog-shovel-v1] Resetting offset for 
> partition bobqueue-binlog-shovel-v1-offsets-6 to 

[jira] [Resolved] (KAFKA-8866) Make Authorizer create/delete exceptions Optional

2019-09-05 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-8866.
---
  Reviewer: Ismael Juma
Resolution: Fixed

> Make Authorizer create/delete exceptions Optional
> -
>
> Key: KAFKA-8866
> URL: https://issues.apache.org/jira/browse/KAFKA-8866
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.4.0
>
>
> From PR review of: 
> [https://github.com/apache/kafka/pull/7268|https://github.com/apache/kafka/pull/7268:]
> We currently return possibly null ApiException in AclCreateResult and 
> AclDeleteResult. It would be better to return Optional.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Resolved] (KAFKA-8857) Config describe should not return isReadOnly=false based on synonyms

2019-09-05 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-8857.
---
  Reviewer: Manikumar
Resolution: Fixed

> Config describe should not return isReadOnly=false based on synonyms
> 
>
> Key: KAFKA-8857
> URL: https://issues.apache.org/jira/browse/KAFKA-8857
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.4.0
>
>
> At the moment, for configs like log.retention.hours which have multiple 
> synonyms (log.retention.ms, log.retention.minutes), we return 
> `isReadyOnly=false` in describeConfigs response for all the synomyms even 
> though only  log.retention.ms can be updated dynamically. We should return 
> isReadOnly=false for log.retention.ms and isReadOnly=true for 
> log.retention.hours and log.retention.minutes to avoid confusion. Users can 
> still determine if there are updateable synonyms by looking at synonyms 
> returned in describeConfigs response.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8857) Config describe should not return isReadOnly=false based on synonyms

2019-09-05 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923178#comment-16923178
 ] 

ASF GitHub Bot commented on KAFKA-8857:
---

rajinisivaram commented on pull request #7278: KAFKA-8857; Don't check synonyms 
while determining if config is readOnly
URL: https://github.com/apache/kafka/pull/7278
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Config describe should not return isReadOnly=false based on synonyms
> 
>
> Key: KAFKA-8857
> URL: https://issues.apache.org/jira/browse/KAFKA-8857
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.4.0
>
>
> At the moment, for configs like log.retention.hours which have multiple 
> synonyms (log.retention.ms, log.retention.minutes), we return 
> `isReadyOnly=false` in describeConfigs response for all the synomyms even 
> though only  log.retention.ms can be updated dynamically. We should return 
> isReadOnly=false for log.retention.ms and isReadOnly=true for 
> log.retention.hours and log.retention.minutes to avoid confusion. Users can 
> still determine if there are updateable synonyms by looking at synonyms 
> returned in describeConfigs response.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8866) Make Authorizer create/delete exceptions Optional

2019-09-05 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923174#comment-16923174
 ] 

ASF GitHub Bot commented on KAFKA-8866:
---

rajinisivaram commented on pull request #7294: KAFKA-8866; Return exceptions as 
Optional in authorizer API
URL: https://github.com/apache/kafka/pull/7294
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Make Authorizer create/delete exceptions Optional
> -
>
> Key: KAFKA-8866
> URL: https://issues.apache.org/jira/browse/KAFKA-8866
> Project: Kafka
>  Issue Type: Sub-task
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.4.0
>
>
> From PR review of: 
> [https://github.com/apache/kafka/pull/7268|https://github.com/apache/kafka/pull/7268:]
> We currently return possibly null ApiException in AclCreateResult and 
> AclDeleteResult. It would be better to return Optional.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (KAFKA-8877) Race condition on partition counter

2019-09-05 Thread Oleg Kuznetsov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleg Kuznetsov updated KAFKA-8877:
--
Description: 
In the method:

*org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue*
{code:java}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, 
counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
{code}
the counter might be created multiple times instead of once.

I propose to replace it with something like *topicCounterMap.compute(topic, _ 
-> ...* (init the counter once per topic))  ** 

 

  was:
In the method:

*org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue*
{code:java}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, 
counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
{code}
the counter might be created multiple times instead of once.

I propose to replace it with something like ** *topicCounterMap.compute(topic, 
_ -> ...* (init the counter once per topic*)) ** *

 


> Race condition on partition counter
> ---
>
> Key: KAFKA-8877
> URL: https://issues.apache.org/jira/browse/KAFKA-8877
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.2.1
>Reporter: Oleg Kuznetsov
>Priority: Major
>
> In the method:
> *org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue*
> {code:java}
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.get(topic);
> if (null == counter) {
> counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
> AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, 
> counter);
> if (currentCounter != null) {
> counter = currentCounter;
> }
> }
> return counter.getAndIncrement();
> }
> {code}
> the counter might be created multiple times instead of once.
> I propose to replace it with something like *topicCounterMap.compute(topic, _ 
> -> ...* (init the counter once per topic))  ** 
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (KAFKA-8877) Race condition on partition counter

2019-09-05 Thread Oleg Kuznetsov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8877?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Oleg Kuznetsov updated KAFKA-8877:
--
Description: 
In the method:

*org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue*
{code:java}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, 
counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
{code}
the counter might be created multiple times instead of once.

I propose to replace it with something like ** *topicCounterMap.compute(topic, 
_ -> ...* (init the counter once per topic*)) ** *

 

  was:
In the method:

*org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue*
{code:java}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, 
counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
{code}
**the counter might be created multiple times instead of once.

I propose to replace it with something like ** *topicCounterMap.compute(topic, 
_ -> ...* (init the counter once per topic*))* **

 


> Race condition on partition counter
> ---
>
> Key: KAFKA-8877
> URL: https://issues.apache.org/jira/browse/KAFKA-8877
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 2.2.1
>Reporter: Oleg Kuznetsov
>Priority: Major
>
> In the method:
> *org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue*
> {code:java}
> private int nextValue(String topic) {
> AtomicInteger counter = topicCounterMap.get(topic);
> if (null == counter) {
> counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
> AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, 
> counter);
> if (currentCounter != null) {
> counter = currentCounter;
> }
> }
> return counter.getAndIncrement();
> }
> {code}
> the counter might be created multiple times instead of once.
> I propose to replace it with something like ** 
> *topicCounterMap.compute(topic, _ -> ...* (init the counter once per topic*)) 
> ** *
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (KAFKA-8877) Race condition on partition counter

2019-09-05 Thread Oleg Kuznetsov (Jira)
Oleg Kuznetsov created KAFKA-8877:
-

 Summary: Race condition on partition counter
 Key: KAFKA-8877
 URL: https://issues.apache.org/jira/browse/KAFKA-8877
 Project: Kafka
  Issue Type: Bug
  Components: producer 
Affects Versions: 2.2.1
Reporter: Oleg Kuznetsov


In the method:

*org.apache.kafka.clients.producer.internals.DefaultPartitioner#nextValue*
{code:java}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, 
counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
{code}
**the counter might be created multiple times instead of once.

I propose to replace it with something like ** *topicCounterMap.compute(topic, 
_ -> ...* (init the counter once per topic*))* **

 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8264) Flaky Test PlaintextConsumerTest#testLowMaxFetchSizeForRequestAndPartition

2019-09-05 Thread Levani Kokhreidze (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16923099#comment-16923099
 ] 

Levani Kokhreidze commented on KAFKA-8264:
--

[https://builds.apache.org/blue/organizations/jenkins/kafka-pr-jdk11-scala2.13/detail/kafka-pr-jdk11-scala2.13/1422/tests]

> Flaky Test PlaintextConsumerTest#testLowMaxFetchSizeForRequestAndPartition
> --
>
> Key: KAFKA-8264
> URL: https://issues.apache.org/jira/browse/KAFKA-8264
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Affects Versions: 2.0.1, 2.3.0
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.4.0
>
>
> [https://builds.apache.org/blue/organizations/jenkins/kafka-2.0-jdk8/detail/kafka-2.0-jdk8/252/tests]
> {quote}org.apache.kafka.common.errors.TopicExistsException: Topic 'topic3' 
> already exists.{quote}
> STDOUT
>  
> {quote}[2019-04-19 03:54:20,080] ERROR [ReplicaFetcher replicaId=1, 
> leaderId=0, fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:20,080] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:20,312] ERROR [ReplicaFetcher replicaId=2, leaderId=0, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:20,313] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition topic-1 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:20,994] ERROR [ReplicaFetcher replicaId=1, leaderId=0, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:21,727] ERROR [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Error for partition topicWithNewMessageFormat-1 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:28,696] ERROR [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:28,699] ERROR [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:29,246] ERROR [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Error for partition topic-1 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:29,247] ERROR [ReplicaFetcher replicaId=0, leaderId=1, 
> fetcherId=0] Error for partition topic-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:29,287] ERROR [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Error for partition topic-1 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:33,408] ERROR [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this topic-partition.
> [2019-04-19 03:54:33,408] ERROR [ReplicaFetcher replicaId=1, leaderId=2, 
> fetcherId=0] Error for partition __consumer_offsets-0 at offset 0 
> (kafka.server.ReplicaFetcherThread:76)
> org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server 
> does not host this