[GitHub] [kafka] satishd closed pull request #11197: 28x TS changes

2021-08-16 Thread GitBox


satishd closed pull request #11197:
URL: https://github.com/apache/kafka/pull/11197


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #11198: MINOR: Use compressionType parameter in SnapshotWriter

2021-08-16 Thread GitBox


dengziming commented on pull request #11198:
URL: https://github.com/apache/kafka/pull/11198#issuecomment-80080


   @showuon Thank you, I am thinking how can I test this case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11124: KAFKA-12839: Let SlidingWindows aggregation support window size of 0

2021-08-16 Thread GitBox


mjsax commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r690013977



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -100,17 +99,78 @@
 
 private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 private final String threadId = Thread.currentThread().getName();
+private final String topic = "topic";
+private final String defaultInOrderName = "InOrder";
+private final String defaultReverseName = "Reverse";
+private final long defaultWindowSize = 10L;
+private final long defaultRetentionPeriod = 5000L;
+
+private WindowBytesStoreSupplier getStoreSupplier(final boolean 
inOrderIterator,
+  final String inOrderName,
+  final String reverseName,
+  final long windowSize) {
+return inOrderIterator
+? new InOrderMemoryWindowStoreSupplier(inOrderName, 
defaultRetentionPeriod, windowSize, false)
+: Stores.inMemoryWindowStore(reverseName, 
ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false);
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void testAggregateSmallInputWithZeroTimeDifference() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+// We use TimeWindow to represent the "windowed KTable" internally, 
so, the window size must be greater than 0 here
+final WindowBytesStoreSupplier storeSupplier = 
getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L);

Review comment:
   Hmmm... Seems to be in issue... The actual final return type is 
`KTable, V>` and thus is window-type agnostic. So we already have 
such a "container". -- However, `windowedBy(SlidingWindow)` returns a 
`TimeWindowedKStream`... Return types are not easy to change... And I don't 
think we can just switch from `TimeWindow` to `SlidingWindow` as concrete type 
either for the sliding window case...
   
   Maybe we are stuck and cannot fix the bug without a breaking change? For 
this case, we would indeed need to carry on with the KIP (but we could only do 
it in 4.0...), but I am wondering if it's worth fixing given the impact?
   
   Also: we have a few issues with the current DSL that we cannot fix easily 
(eg KIP-300). Thus, a long term solution could be, to leave the current API 
as-is, and built a new DSL in parallel (we did this in the past when we 
introduced `StreamsBuilder`). This way, we can change the API in any way, but 
it would be a long-term solution only.
   
   It might also help with regard to the new PAPI that uses `Record` instead of 
`` type, and that is not easily adopted for `transform()` (and siblings). 
We could change the whole DSL to `Record` (ie, `KStream` -- or 
course we don't need `Record` in the generic type -- it's just for illustrative 
purpose). It would also cover the "add headers" KIP, fix KIP-300, we could 
introduce a `PartitionedKStream` (cf current KIP-759 discussion) and a few 
other minor issue (like rename `KGroupedStream` to `GroupedKStream`) all at 
once... And we could cleanup the topology optimization step and operator naming 
rules (which are a big mess to understand which `Named` object overwrites 
others...) -- We can also get rid of the wrappers for `KeyValueStore` to 
`TimestampedKeyValueStore` and change the interface from 
`Materialized` to `Materialized

[GitHub] [kafka] showuon commented on a change in pull request #11218: MINOR: optimize performAssignment to skip unnecessary check

2021-08-16 Thread GitBox


showuon commented on a change in pull request #11218:
URL: https://github.com/apache/kafka/pull/11218#discussion_r690007620



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
##
@@ -48,6 +53,16 @@
 public class ConsumerConfig extends AbstractConfig {
 private static final ConfigDef CONFIG;
 
+// a list contains all the in-product assignor names. Should be updated 
when new assignor added.
+// This is to help optimize the ConsumerCoordinator#performAssignment
+public static final List IN_PRODUCT_ASSIGNOR_NAMES =
+Collections.unmodifiableList(Arrays.asList(
+RANGE_ASSIGNOR_NAME,
+ROUNDROBIN_ASSIGNOR_NAME,
+STICKY_ASSIGNOR_NAME,
+COOPERATIVE_STICKY_ASSIGNOR_NAME

Review comment:
   Question: Should we add `StreamsPartitionAssignor` here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned

2021-08-16 Thread GitBox


jsancio commented on a change in pull request #11216:
URL: https://github.com/apache/kafka/pull/11216#discussion_r689978096



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -2074,48 +2073,23 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
-  private[kafka] def calculateDeltaChanges(delta: TopicsDelta)
-: (mutable.HashMap[TopicPartition, Boolean],
-   mutable.HashMap[TopicPartition, LocalLeaderInfo],
-   mutable.HashMap[TopicPartition, LocalLeaderInfo]) = {
-val deleted = new mutable.HashMap[TopicPartition, Boolean]()
-delta.deletedTopicIds().forEach { topicId =>
-  val topicImage = delta.image().getTopic(topicId)
-  topicImage.partitions().keySet().forEach { partitionId =>
-deleted.put(new TopicPartition(topicImage.name(), partitionId), true)
-  }
-}
-val newLocalLeaders = new mutable.HashMap[TopicPartition, 
LocalLeaderInfo]()
-val newLocalFollowers = new mutable.HashMap[TopicPartition, 
LocalLeaderInfo]()
-delta.changedTopics().values().forEach { topicDelta =>
-  topicDelta.newLocalLeaders(config.nodeId).forEach { e =>
-newLocalLeaders.put(new TopicPartition(topicDelta.name(), e.getKey),
-  LocalLeaderInfo(topicDelta.id(), e.getValue))
-  }
-  topicDelta.newLocalFollowers(config.nodeId).forEach { e =>
-newLocalFollowers.put(new TopicPartition(topicDelta.name(), e.getKey),
-  LocalLeaderInfo(topicDelta.id(), e.getValue))
-  }
-}
-(deleted, newLocalLeaders, newLocalFollowers)
-  }
-
   /**
* Apply a KRaft topic change delta.
*
* @param newImageThe new metadata image.
* @param delta   The delta to apply.
*/
   def applyDelta(newImage: MetadataImage, delta: TopicsDelta): Unit = {
-// Before taking the lock, build some hash maps that we will need.
-val (deleted, newLocalLeaders, newLocalFollowers) = 
calculateDeltaChanges(delta)
+// Before taking the lock, compute the local changes
+val localChanges = delta.localChanges(config.nodeId)
 
 replicaStateChangeLock.synchronized {
   // Handle deleted partitions. We need to do this first because we might 
subsequently
   // create new partitions with the same names as the ones we are deleting 
here.
-  if (!deleted.isEmpty) {
-stateChangeLogger.info(s"Deleting ${deleted.size} partition(s).")
-stopPartitions(deleted).foreach { case (topicPartition, e) =>
+  if (!localChanges.deletes.isEmpty) {

Review comment:
   If that happens then `TopicsDelta` would remove it from `changedTopics`: 
https://github.com/apache/kafka/pull/11216/files#diff-521beb14ca284c8e5ed56e92271216167da342c44b86992add15f33c39128ecbR93
   
   ```
   public String replay(RemoveTopicRecord record) {
   TopicDelta topicDelta = changedTopics.remove(record.topicId());
   ...
   }
   ```
   
   Let me write a test that shows this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned

2021-08-16 Thread GitBox


jsancio commented on a change in pull request #11216:
URL: https://github.com/apache/kafka/pull/11216#discussion_r689978096



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -2074,48 +2073,23 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
-  private[kafka] def calculateDeltaChanges(delta: TopicsDelta)
-: (mutable.HashMap[TopicPartition, Boolean],
-   mutable.HashMap[TopicPartition, LocalLeaderInfo],
-   mutable.HashMap[TopicPartition, LocalLeaderInfo]) = {
-val deleted = new mutable.HashMap[TopicPartition, Boolean]()
-delta.deletedTopicIds().forEach { topicId =>
-  val topicImage = delta.image().getTopic(topicId)
-  topicImage.partitions().keySet().forEach { partitionId =>
-deleted.put(new TopicPartition(topicImage.name(), partitionId), true)
-  }
-}
-val newLocalLeaders = new mutable.HashMap[TopicPartition, 
LocalLeaderInfo]()
-val newLocalFollowers = new mutable.HashMap[TopicPartition, 
LocalLeaderInfo]()
-delta.changedTopics().values().forEach { topicDelta =>
-  topicDelta.newLocalLeaders(config.nodeId).forEach { e =>
-newLocalLeaders.put(new TopicPartition(topicDelta.name(), e.getKey),
-  LocalLeaderInfo(topicDelta.id(), e.getValue))
-  }
-  topicDelta.newLocalFollowers(config.nodeId).forEach { e =>
-newLocalFollowers.put(new TopicPartition(topicDelta.name(), e.getKey),
-  LocalLeaderInfo(topicDelta.id(), e.getValue))
-  }
-}
-(deleted, newLocalLeaders, newLocalFollowers)
-  }
-
   /**
* Apply a KRaft topic change delta.
*
* @param newImageThe new metadata image.
* @param delta   The delta to apply.
*/
   def applyDelta(newImage: MetadataImage, delta: TopicsDelta): Unit = {
-// Before taking the lock, build some hash maps that we will need.
-val (deleted, newLocalLeaders, newLocalFollowers) = 
calculateDeltaChanges(delta)
+// Before taking the lock, compute the local changes
+val localChanges = delta.localChanges(config.nodeId)
 
 replicaStateChangeLock.synchronized {
   // Handle deleted partitions. We need to do this first because we might 
subsequently
   // create new partitions with the same names as the ones we are deleting 
here.
-  if (!deleted.isEmpty) {
-stateChangeLogger.info(s"Deleting ${deleted.size} partition(s).")
-stopPartitions(deleted).foreach { case (topicPartition, e) =>
+  if (!localChanges.deletes.isEmpty) {

Review comment:
   If that happens then `TopicsDelta` would remove it from `changedTopics`: 
https://github.com/apache/kafka/pull/11216/files#diff-521beb14ca284c8e5ed56e92271216167da342c44b86992add15f33c39128ecbR93
   
   Let me write a test that shows this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned

2021-08-16 Thread GitBox


junrao commented on a change in pull request #11216:
URL: https://github.com/apache/kafka/pull/11216#discussion_r689974956



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -2074,48 +2073,23 @@ class ReplicaManager(val config: KafkaConfig,
 }
   }
 
-  private[kafka] def calculateDeltaChanges(delta: TopicsDelta)
-: (mutable.HashMap[TopicPartition, Boolean],
-   mutable.HashMap[TopicPartition, LocalLeaderInfo],
-   mutable.HashMap[TopicPartition, LocalLeaderInfo]) = {
-val deleted = new mutable.HashMap[TopicPartition, Boolean]()
-delta.deletedTopicIds().forEach { topicId =>
-  val topicImage = delta.image().getTopic(topicId)
-  topicImage.partitions().keySet().forEach { partitionId =>
-deleted.put(new TopicPartition(topicImage.name(), partitionId), true)
-  }
-}
-val newLocalLeaders = new mutable.HashMap[TopicPartition, 
LocalLeaderInfo]()
-val newLocalFollowers = new mutable.HashMap[TopicPartition, 
LocalLeaderInfo]()
-delta.changedTopics().values().forEach { topicDelta =>
-  topicDelta.newLocalLeaders(config.nodeId).forEach { e =>
-newLocalLeaders.put(new TopicPartition(topicDelta.name(), e.getKey),
-  LocalLeaderInfo(topicDelta.id(), e.getValue))
-  }
-  topicDelta.newLocalFollowers(config.nodeId).forEach { e =>
-newLocalFollowers.put(new TopicPartition(topicDelta.name(), e.getKey),
-  LocalLeaderInfo(topicDelta.id(), e.getValue))
-  }
-}
-(deleted, newLocalLeaders, newLocalFollowers)
-  }
-
   /**
* Apply a KRaft topic change delta.
*
* @param newImageThe new metadata image.
* @param delta   The delta to apply.
*/
   def applyDelta(newImage: MetadataImage, delta: TopicsDelta): Unit = {
-// Before taking the lock, build some hash maps that we will need.
-val (deleted, newLocalLeaders, newLocalFollowers) = 
calculateDeltaChanges(delta)
+// Before taking the lock, compute the local changes
+val localChanges = delta.localChanges(config.nodeId)
 
 replicaStateChangeLock.synchronized {
   // Handle deleted partitions. We need to do this first because we might 
subsequently
   // create new partitions with the same names as the ones we are deleting 
here.
-  if (!deleted.isEmpty) {
-stateChangeLogger.info(s"Deleting ${deleted.size} partition(s).")
-stopPartitions(deleted).foreach { case (topicPartition, e) =>
+  if (!localChanges.deletes.isEmpty) {

Review comment:
   Is it always correct to process the deletes first? For example, a topic 
could be first created and then deleted. By processing the deletes first, we 
would be keeping the partition and the log when they should be deleted?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11124: KAFKA-12839: Let SlidingWindows aggregation support window size of 0

2021-08-16 Thread GitBox


showuon commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r689953353



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -100,17 +99,78 @@
 
 private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 private final String threadId = Thread.currentThread().getName();
+private final String topic = "topic";
+private final String defaultInOrderName = "InOrder";
+private final String defaultReverseName = "Reverse";
+private final long defaultWindowSize = 10L;
+private final long defaultRetentionPeriod = 5000L;
+
+private WindowBytesStoreSupplier getStoreSupplier(final boolean 
inOrderIterator,
+  final String inOrderName,
+  final String reverseName,
+  final long windowSize) {
+return inOrderIterator
+? new InOrderMemoryWindowStoreSupplier(inOrderName, 
defaultRetentionPeriod, windowSize, false)
+: Stores.inMemoryWindowStore(reverseName, 
ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false);
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void testAggregateSmallInputWithZeroTimeDifference() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+// We use TimeWindow to represent the "windowed KTable" internally, 
so, the window size must be greater than 0 here
+final WindowBytesStoreSupplier storeSupplier = 
getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L);

Review comment:
   @mjsax , this is the one remaining question left in this PR. Because we 
use `TimeWindow` to represent the windowed key, so that we can only set window 
size > 0. One solution provided by @ableegoldman is that we can have a window 
container that only holds the start and end time. What do you think? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #11124: KAFKA-12839: Let SlidingWindows aggregation support window size of 0

2021-08-16 Thread GitBox


mjsax commented on a change in pull request #11124:
URL: https://github.com/apache/kafka/pull/11124#discussion_r689950309



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -100,17 +99,78 @@
 
 private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 private final String threadId = Thread.currentThread().getName();
+private final String topic = "topic";
+private final String defaultInOrderName = "InOrder";
+private final String defaultReverseName = "Reverse";
+private final long defaultWindowSize = 10L;
+private final long defaultRetentionPeriod = 5000L;
+
+private WindowBytesStoreSupplier getStoreSupplier(final boolean 
inOrderIterator,
+  final String inOrderName,
+  final String reverseName,
+  final long windowSize) {
+return inOrderIterator
+? new InOrderMemoryWindowStoreSupplier(inOrderName, 
defaultRetentionPeriod, windowSize, false)
+: Stores.inMemoryWindowStore(reverseName, 
ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false);
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void testAggregateSmallInputWithZeroTimeDifference() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+// We use TimeWindow to represent the "windowed KTable" internally, 
so, the window size must be greater than 0 here
+final WindowBytesStoreSupplier storeSupplier = 
getStoreSupplier(inOrderIterator, defaultInOrderName, defaultReverseName, 1L);

Review comment:
   Size should be `0L` ?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregateTest.java
##
@@ -100,17 +99,78 @@
 
 private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.String(), Serdes.String());
 private final String threadId = Thread.currentThread().getName();
+private final String topic = "topic";
+private final String defaultInOrderName = "InOrder";
+private final String defaultReverseName = "Reverse";
+private final long defaultWindowSize = 10L;
+private final long defaultRetentionPeriod = 5000L;
+
+private WindowBytesStoreSupplier getStoreSupplier(final boolean 
inOrderIterator,
+  final String inOrderName,
+  final String reverseName,
+  final long windowSize) {
+return inOrderIterator
+? new InOrderMemoryWindowStoreSupplier(inOrderName, 
defaultRetentionPeriod, windowSize, false)
+: Stores.inMemoryWindowStore(reverseName, 
ofMillis(defaultRetentionPeriod), ofMillis(windowSize), false);
+}
+
+@SuppressWarnings("unchecked")
+@Test
+public void testAggregateSmallInputWithZeroTimeDifference() {
+final StreamsBuilder builder = new StreamsBuilder();
+
+// We use TimeWindow to represent the "windowed KTable" internally, 
so, the window size must be greater than 0 here

Review comment:
   Seems this comment needs to be updated?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned

2021-08-16 Thread GitBox


jsancio commented on a change in pull request #11216:
URL: https://github.com/apache/kafka/pull/11216#discussion_r689940858



##
File path: metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
##
@@ -126,6 +133,158 @@ private static TopicImage newTopicImage(String name, Uuid 
id, PartitionRegistrat
 IMAGE2 = new TopicsImage(newTopicsByIdMap(topics2), 
newTopicsByNameMap(topics2));
 }
 
+private PartitionRegistration newPartition(int[] replicas) {
+return new PartitionRegistration(replicas, replicas, Replicas.NONE, 
Replicas.NONE, replicas[0], 1, 1);
+}
+
+@Test
+public void testLocalReplicaChanges() {
+int localId = 3;
+Uuid newFooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w");
+
+List topics = new ArrayList<>(TOPIC_IMAGES1);
+topics.add(
+newTopicImage(
+"foo",
+newFooId,
+newPartition(new int[] {0, 1, 3}),
+newPartition(new int[] {3, 1, 2}),
+newPartition(new int[] {0, 1, 3}),
+newPartition(new int[] {3, 1, 2}),
+newPartition(new int[] {0, 1, 2}),
+newPartition(new int[] {0, 1, 2})
+)
+);
+
+TopicsImage image = new TopicsImage(newTopicsByIdMap(topics), 
newTopicsByNameMap(topics));
+
+List topicRecords = new 
ArrayList<>(DELTA1_RECORDS);
+// foo-0 - follower to leader
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(0)
+  .setLeader(3),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-1 - leader to follower
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(1)
+  .setLeader(1),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-2 - follower to removed
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(2)
+  .setIsr(Arrays.asList(0, 1, 2))
+  .setReplicas(Arrays.asList(0, 1, 2)),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-3 - leader to removed
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(3)
+  .setLeader(0)
+  .setIsr(Arrays.asList(0, 1, 2))
+  .setReplicas(Arrays.asList(0, 1, 2)),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-4 - not replica to leader
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(4)
+  .setLeader(3)
+  .setIsr(Arrays.asList(3, 1, 2))
+  .setReplicas(Arrays.asList(3, 1, 2)),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-5 - not replica to follower
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(5)
+  .setIsr(Arrays.asList(0, 1, 3))
+  .setReplicas(Arrays.asList(0, 1, 3)),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+
+/* Changes already include in DELTA1_RECORDS:
+ * foo - topic id deleted
+ * bar-0 - stay as follower with different partition epoch
+ * baz-0 - new topic to leader
+ */
+
+// baz-1 - new topic to follower
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionRecord()
+.setPartitionId(1)
+.setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg"))
+.setReplicas(Arrays.asList(4, 2, 3))
+.setIsr(Arrays.asList(4, 2, 3))
+.setLeader(4)
+.setLeaderEpoch(2)
+.setPartitionEpoch(1),
+PARTITION_RECORD.highestSupportedVersion()
+)
+);
+
+TopicsDelta delta = new TopicsDelta(image);
+RecordTestUtils.replayAll(delta, topicRecords);
+
+LocalReplicaChanges changes = delta.localChanges(localId);
+assertEquals(
+new HashSet<>(
+

[jira] [Created] (KAFKA-13208) Use TopicIdPartition instead of TopicPartition when computing the topic delta

2021-08-16 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-13208:
--

 Summary: Use TopicIdPartition instead of TopicPartition when 
computing the topic delta
 Key: KAFKA-13208
 URL: https://issues.apache.org/jira/browse/KAFKA-13208
 Project: Kafka
  Issue Type: Improvement
  Components: kraft, replication
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio


{{TopicPartition}} is used as the key when computing the local changes in 
{{TopicsDelta}}. The topic id is included in the Map value return by 
{{localChanges}}. I think that the handling of this code and the corresponding 
code in {{ReplicaManager}} could be simplified if {{localChanges}} instead 
returned something like
{code:java}
{
  deletes: Set[TopicIdPartition],
  leaders: Map[TopicIdPartition, PartitionRegistration],
  followers: Map[TopicIdPartition, PartitionRegistration] 
}{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jsancio commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned

2021-08-16 Thread GitBox


jsancio commented on a change in pull request #11216:
URL: https://github.com/apache/kafka/pull/11216#discussion_r689937435



##
File path: core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
##
@@ -410,6 +421,32 @@ class KRaftClusterTest {
 }
   }
 
+  private def checkReplicaManager(cluster: KafkaClusterTestKit, 
expectedHosting: List[(Int, List[Boolean])]): Unit = {
+for ((brokerId, partitionsIsHosted) <- expectedHosting) {
+  val broker = cluster.brokers().get(brokerId)
+  // lock and unlock so we can read the replica manager

Review comment:
   I think we need to do this because `var replicaManager` in 
`BrokerServer` is set in a different thread by `KafkaClusterTestKit`. If my 
understanding of Java Memory Model is correct we need to lock/synchronize so 
that we force a memory barrier.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned

2021-08-16 Thread GitBox


jsancio commented on a change in pull request #11216:
URL: https://github.com/apache/kafka/pull/11216#discussion_r689870784



##
File path: metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
##
@@ -126,6 +133,158 @@ private static TopicImage newTopicImage(String name, Uuid 
id, PartitionRegistrat
 IMAGE2 = new TopicsImage(newTopicsByIdMap(topics2), 
newTopicsByNameMap(topics2));
 }
 
+private PartitionRegistration newPartition(int[] replicas) {
+return new PartitionRegistration(replicas, replicas, Replicas.NONE, 
Replicas.NONE, replicas[0], 1, 1);
+}
+
+@Test
+public void testLocalReplicaChanges() {
+int localId = 3;
+Uuid newFooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w");
+
+List topics = new ArrayList<>(TOPIC_IMAGES1);
+topics.add(
+newTopicImage(
+"foo",
+newFooId,
+newPartition(new int[] {0, 1, 3}),
+newPartition(new int[] {3, 1, 2}),
+newPartition(new int[] {0, 1, 3}),
+newPartition(new int[] {3, 1, 2}),
+newPartition(new int[] {0, 1, 2}),
+newPartition(new int[] {0, 1, 2})
+)
+);
+
+TopicsImage image = new TopicsImage(newTopicsByIdMap(topics), 
newTopicsByNameMap(topics));
+
+List topicRecords = new 
ArrayList<>(DELTA1_RECORDS);
+// foo-0 - follower to leader
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(0)
+  .setLeader(3),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-1 - leader to follower
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(1)
+  .setLeader(1),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-2 - follower to removed
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(2)
+  .setIsr(Arrays.asList(0, 1, 2))
+  .setReplicas(Arrays.asList(0, 1, 2)),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-3 - leader to removed
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(3)
+  .setLeader(0)
+  .setIsr(Arrays.asList(0, 1, 2))
+  .setReplicas(Arrays.asList(0, 1, 2)),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-4 - not replica to leader
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(4)
+  .setLeader(3)
+  .setIsr(Arrays.asList(3, 1, 2))
+  .setReplicas(Arrays.asList(3, 1, 2)),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-5 - not replica to follower
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(5)
+  .setIsr(Arrays.asList(0, 1, 3))
+  .setReplicas(Arrays.asList(0, 1, 3)),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+
+/* Changes already include in DELTA1_RECORDS:
+ * foo - topic id deleted
+ * bar-0 - stay as follower with different partition epoch
+ * baz-0 - new topic to leader
+ */
+
+// baz-1 - new topic to follower
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionRecord()
+.setPartitionId(1)
+.setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg"))
+.setReplicas(Arrays.asList(4, 2, 3))
+.setIsr(Arrays.asList(4, 2, 3))
+.setLeader(4)
+.setLeaderEpoch(2)
+.setPartitionEpoch(1),
+PARTITION_RECORD.highestSupportedVersion()
+)
+);
+
+TopicsDelta delta = new TopicsDelta(image);
+RecordTestUtils.replayAll(delta, topicRecords);
+
+LocalReplicaChanges changes = delta.localChanges(localId);
+assertEquals(
+new HashSet<>(
+

[jira] [Commented] (KAFKA-12713) Report "REAL" follower/consumer fetch latency

2021-08-16 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17400039#comment-17400039
 ] 

Ismael Juma commented on KAFKA-12713:
-

I had missed that. Worth bumping that thread for feedback.

> Report "REAL" follower/consumer fetch latency
> -
>
> Key: KAFKA-12713
> URL: https://issues.apache.org/jira/browse/KAFKA-12713
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ming Liu
>Assignee: Kai Huang
>Priority: Major
>
> The fetch latency is an important metrics to monitor for the cluster 
> performance. With ACK=ALL, the produce latency is affected primarily by 
> broker fetch latency.
> However, currently the reported fetch latency didn't reflect the true fetch 
> latency because it sometimes need to stay in purgatory and wait for 
> replica.fetch.wait.max.ms when data is not available. This greatly affect the 
> real P50, P99 etc. 
> I like to propose a KIP to be able track the real fetch latency for both 
> broker follower and consumer. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12713) Report "REAL" follower/consumer fetch latency

2021-08-16 Thread Kai Huang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17400037#comment-17400037
 ] 

Kai Huang commented on KAFKA-12713:
---

[~ijuma] here is the [discussion 
thread|https://lists.apache.org/thread.html/r261915b64c819129bc6017adaa12e8f9a0feb74c24ba331f4a08f30c%40%3Cdev.kafka.apache.org%3E]
 that Ming created a while ago.

> Report "REAL" follower/consumer fetch latency
> -
>
> Key: KAFKA-12713
> URL: https://issues.apache.org/jira/browse/KAFKA-12713
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ming Liu
>Assignee: Kai Huang
>Priority: Major
>
> The fetch latency is an important metrics to monitor for the cluster 
> performance. With ACK=ALL, the produce latency is affected primarily by 
> broker fetch latency.
> However, currently the reported fetch latency didn't reflect the true fetch 
> latency because it sometimes need to stay in purgatory and wait for 
> replica.fetch.wait.max.ms when data is not available. This greatly affect the 
> real P50, P99 etc. 
> I like to propose a KIP to be able track the real fetch latency for both 
> broker follower and consumer. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned

2021-08-16 Thread GitBox


junrao commented on a change in pull request #11216:
URL: https://github.com/apache/kafka/pull/11216#discussion_r689907759



##
File path: metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
##
@@ -126,6 +133,158 @@ private static TopicImage newTopicImage(String name, Uuid 
id, PartitionRegistrat
 IMAGE2 = new TopicsImage(newTopicsByIdMap(topics2), 
newTopicsByNameMap(topics2));
 }
 
+private PartitionRegistration newPartition(int[] replicas) {
+return new PartitionRegistration(replicas, replicas, Replicas.NONE, 
Replicas.NONE, replicas[0], 1, 1);
+}
+
+@Test
+public void testLocalReplicaChanges() {
+int localId = 3;
+Uuid newFooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w");
+
+List topics = new ArrayList<>(TOPIC_IMAGES1);
+topics.add(
+newTopicImage(
+"foo",
+newFooId,
+newPartition(new int[] {0, 1, 3}),
+newPartition(new int[] {3, 1, 2}),
+newPartition(new int[] {0, 1, 3}),
+newPartition(new int[] {3, 1, 2}),
+newPartition(new int[] {0, 1, 2}),
+newPartition(new int[] {0, 1, 2})
+)
+);
+
+TopicsImage image = new TopicsImage(newTopicsByIdMap(topics), 
newTopicsByNameMap(topics));
+
+List topicRecords = new 
ArrayList<>(DELTA1_RECORDS);
+// foo-0 - follower to leader
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(0)
+  .setLeader(3),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-1 - leader to follower
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(1)
+  .setLeader(1),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-2 - follower to removed
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(2)
+  .setIsr(Arrays.asList(0, 1, 2))
+  .setReplicas(Arrays.asList(0, 1, 2)),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-3 - leader to removed
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(3)
+  .setLeader(0)
+  .setIsr(Arrays.asList(0, 1, 2))
+  .setReplicas(Arrays.asList(0, 1, 2)),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-4 - not replica to leader
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(4)
+  .setLeader(3)
+  .setIsr(Arrays.asList(3, 1, 2))
+  .setReplicas(Arrays.asList(3, 1, 2)),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-5 - not replica to follower
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(5)
+  .setIsr(Arrays.asList(0, 1, 3))
+  .setReplicas(Arrays.asList(0, 1, 3)),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+
+/* Changes already include in DELTA1_RECORDS:
+ * foo - topic id deleted
+ * bar-0 - stay as follower with different partition epoch
+ * baz-0 - new topic to leader
+ */
+
+// baz-1 - new topic to follower
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionRecord()
+.setPartitionId(1)
+.setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg"))
+.setReplicas(Arrays.asList(4, 2, 3))
+.setIsr(Arrays.asList(4, 2, 3))
+.setLeader(4)
+.setLeaderEpoch(2)
+.setPartitionEpoch(1),
+PARTITION_RECORD.highestSupportedVersion()
+)
+);
+
+TopicsDelta delta = new TopicsDelta(image);
+RecordTestUtils.replayAll(delta, topicRecords);
+
+LocalReplicaChanges changes = delta.localChanges(localId);
+assertEquals(
+new HashSet<>(
+

[GitHub] [kafka] hachikuji commented on a change in pull request #11221: KAFKA-13207: Don't partition state on fetch response with diverging epoch if partition removed from fetcher

2021-08-16 Thread GitBox


hachikuji commented on a change in pull request #11221:
URL: https://github.com/apache/kafka/pull/11221#discussion_r689886267



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -268,7 +262,10 @@ abstract class AbstractFetcherThread(name: String,
 val fetchOffsets = mutable.HashMap.empty[TopicPartition, 
OffsetTruncationState]
 val partitionsWithError = mutable.HashSet.empty[TopicPartition]
 
-fetchedEpochs.forKeyValue { (tp, leaderEpochOffset) =>
+// Partitions may have been removed from the fetcher while the thread was 
waiting for fetch
+// response. Filter out removed partitions while holding 
`partitionMapLock` to ensure that we
+// don't update state for any partition that may have already been 
migrated to another thread.
+fetchedEpochs.filter { case (tp, _) => partitionStates.contains(tp) 
}.forKeyValue { (tp, leaderEpochOffset) =>

Review comment:
   Related to David's comment, but I think the `filter` here still builds a 
collection. Alternatively, we could move the check into `forKeyValue`. Perhaps 
it would even be useful having a `trace` level log message in the `else` case 
when we ignore the result.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned

2021-08-16 Thread GitBox


jsancio commented on a change in pull request #11216:
URL: https://github.com/apache/kafka/pull/11216#discussion_r689874304



##
File path: metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
##
@@ -126,6 +133,158 @@ private static TopicImage newTopicImage(String name, Uuid 
id, PartitionRegistrat
 IMAGE2 = new TopicsImage(newTopicsByIdMap(topics2), 
newTopicsByNameMap(topics2));
 }
 
+private PartitionRegistration newPartition(int[] replicas) {
+return new PartitionRegistration(replicas, replicas, Replicas.NONE, 
Replicas.NONE, replicas[0], 1, 1);
+}
+
+@Test
+public void testLocalReplicaChanges() {
+int localId = 3;
+Uuid newFooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w");
+
+List topics = new ArrayList<>(TOPIC_IMAGES1);
+topics.add(
+newTopicImage(
+"foo",
+newFooId,
+newPartition(new int[] {0, 1, 3}),
+newPartition(new int[] {3, 1, 2}),
+newPartition(new int[] {0, 1, 3}),
+newPartition(new int[] {3, 1, 2}),
+newPartition(new int[] {0, 1, 2}),
+newPartition(new int[] {0, 1, 2})
+)
+);
+
+TopicsImage image = new TopicsImage(newTopicsByIdMap(topics), 
newTopicsByNameMap(topics));
+
+List topicRecords = new 
ArrayList<>(DELTA1_RECORDS);
+// foo-0 - follower to leader
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(0)
+  .setLeader(3),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-1 - leader to follower
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(1)
+  .setLeader(1),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-2 - follower to removed
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(2)
+  .setIsr(Arrays.asList(0, 1, 2))
+  .setReplicas(Arrays.asList(0, 1, 2)),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-3 - leader to removed
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(3)
+  .setLeader(0)
+  .setIsr(Arrays.asList(0, 1, 2))
+  .setReplicas(Arrays.asList(0, 1, 2)),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-4 - not replica to leader
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(4)
+  .setLeader(3)
+  .setIsr(Arrays.asList(3, 1, 2))
+  .setReplicas(Arrays.asList(3, 1, 2)),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-5 - not replica to follower
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(5)
+  .setIsr(Arrays.asList(0, 1, 3))
+  .setReplicas(Arrays.asList(0, 1, 3)),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+
+/* Changes already include in DELTA1_RECORDS:
+ * foo - topic id deleted
+ * bar-0 - stay as follower with different partition epoch
+ * baz-0 - new topic to leader
+ */
+
+// baz-1 - new topic to follower
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionRecord()
+.setPartitionId(1)
+.setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg"))
+.setReplicas(Arrays.asList(4, 2, 3))
+.setIsr(Arrays.asList(4, 2, 3))
+.setLeader(4)
+.setLeaderEpoch(2)
+.setPartitionEpoch(1),
+PARTITION_RECORD.highestSupportedVersion()
+)
+);
+
+TopicsDelta delta = new TopicsDelta(image);
+RecordTestUtils.replayAll(delta, topicRecords);
+
+LocalReplicaChanges changes = delta.localChanges(localId);
+assertEquals(
+new HashSet<>(
+

[GitHub] [kafka] jsancio commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned

2021-08-16 Thread GitBox


jsancio commented on a change in pull request #11216:
URL: https://github.com/apache/kafka/pull/11216#discussion_r689871542



##
File path: metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
##
@@ -126,6 +133,158 @@ private static TopicImage newTopicImage(String name, Uuid 
id, PartitionRegistrat
 IMAGE2 = new TopicsImage(newTopicsByIdMap(topics2), 
newTopicsByNameMap(topics2));
 }
 
+private PartitionRegistration newPartition(int[] replicas) {
+return new PartitionRegistration(replicas, replicas, Replicas.NONE, 
Replicas.NONE, replicas[0], 1, 1);
+}
+
+@Test
+public void testLocalReplicaChanges() {
+int localId = 3;
+Uuid newFooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w");
+
+List topics = new ArrayList<>(TOPIC_IMAGES1);
+topics.add(
+newTopicImage(
+"foo",
+newFooId,
+newPartition(new int[] {0, 1, 3}),
+newPartition(new int[] {3, 1, 2}),
+newPartition(new int[] {0, 1, 3}),
+newPartition(new int[] {3, 1, 2}),
+newPartition(new int[] {0, 1, 2}),
+newPartition(new int[] {0, 1, 2})
+)
+);
+
+TopicsImage image = new TopicsImage(newTopicsByIdMap(topics), 
newTopicsByNameMap(topics));
+
+List topicRecords = new 
ArrayList<>(DELTA1_RECORDS);
+// foo-0 - follower to leader
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(0)
+  .setLeader(3),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-1 - leader to follower
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(1)
+  .setLeader(1),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-2 - follower to removed
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(2)
+  .setIsr(Arrays.asList(0, 1, 2))
+  .setReplicas(Arrays.asList(0, 1, 2)),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-3 - leader to removed
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(3)
+  .setLeader(0)
+  .setIsr(Arrays.asList(0, 1, 2))
+  .setReplicas(Arrays.asList(0, 1, 2)),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-4 - not replica to leader
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(4)
+  .setLeader(3)
+  .setIsr(Arrays.asList(3, 1, 2))
+  .setReplicas(Arrays.asList(3, 1, 2)),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-5 - not replica to follower
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(5)
+  .setIsr(Arrays.asList(0, 1, 3))
+  .setReplicas(Arrays.asList(0, 1, 3)),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+
+/* Changes already include in DELTA1_RECORDS:
+ * foo - topic id deleted
+ * bar-0 - stay as follower with different partition epoch
+ * baz-0 - new topic to leader
+ */
+
+// baz-1 - new topic to follower
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionRecord()
+.setPartitionId(1)
+.setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg"))
+.setReplicas(Arrays.asList(4, 2, 3))
+.setIsr(Arrays.asList(4, 2, 3))
+.setLeader(4)
+.setLeaderEpoch(2)
+.setPartitionEpoch(1),
+PARTITION_RECORD.highestSupportedVersion()
+)
+);
+
+TopicsDelta delta = new TopicsDelta(image);
+RecordTestUtils.replayAll(delta, topicRecords);
+
+LocalReplicaChanges changes = delta.localChanges(localId);
+assertEquals(
+new HashSet<>(
+

[GitHub] [kafka] jsancio commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned

2021-08-16 Thread GitBox


jsancio commented on a change in pull request #11216:
URL: https://github.com/apache/kafka/pull/11216#discussion_r689870784



##
File path: metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
##
@@ -126,6 +133,158 @@ private static TopicImage newTopicImage(String name, Uuid 
id, PartitionRegistrat
 IMAGE2 = new TopicsImage(newTopicsByIdMap(topics2), 
newTopicsByNameMap(topics2));
 }
 
+private PartitionRegistration newPartition(int[] replicas) {
+return new PartitionRegistration(replicas, replicas, Replicas.NONE, 
Replicas.NONE, replicas[0], 1, 1);
+}
+
+@Test
+public void testLocalReplicaChanges() {
+int localId = 3;
+Uuid newFooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w");
+
+List topics = new ArrayList<>(TOPIC_IMAGES1);
+topics.add(
+newTopicImage(
+"foo",
+newFooId,
+newPartition(new int[] {0, 1, 3}),
+newPartition(new int[] {3, 1, 2}),
+newPartition(new int[] {0, 1, 3}),
+newPartition(new int[] {3, 1, 2}),
+newPartition(new int[] {0, 1, 2}),
+newPartition(new int[] {0, 1, 2})
+)
+);
+
+TopicsImage image = new TopicsImage(newTopicsByIdMap(topics), 
newTopicsByNameMap(topics));
+
+List topicRecords = new 
ArrayList<>(DELTA1_RECORDS);
+// foo-0 - follower to leader
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(0)
+  .setLeader(3),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-1 - leader to follower
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(1)
+  .setLeader(1),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-2 - follower to removed
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(2)
+  .setIsr(Arrays.asList(0, 1, 2))
+  .setReplicas(Arrays.asList(0, 1, 2)),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-3 - leader to removed
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(3)
+  .setLeader(0)
+  .setIsr(Arrays.asList(0, 1, 2))
+  .setReplicas(Arrays.asList(0, 1, 2)),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-4 - not replica to leader
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(4)
+  .setLeader(3)
+  .setIsr(Arrays.asList(3, 1, 2))
+  .setReplicas(Arrays.asList(3, 1, 2)),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-5 - not replica to follower
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(5)
+  .setIsr(Arrays.asList(0, 1, 3))
+  .setReplicas(Arrays.asList(0, 1, 3)),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+
+/* Changes already include in DELTA1_RECORDS:
+ * foo - topic id deleted
+ * bar-0 - stay as follower with different partition epoch
+ * baz-0 - new topic to leader
+ */
+
+// baz-1 - new topic to follower
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionRecord()
+.setPartitionId(1)
+.setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg"))
+.setReplicas(Arrays.asList(4, 2, 3))
+.setIsr(Arrays.asList(4, 2, 3))
+.setLeader(4)
+.setLeaderEpoch(2)
+.setPartitionEpoch(1),
+PARTITION_RECORD.highestSupportedVersion()
+)
+);
+
+TopicsDelta delta = new TopicsDelta(image);
+RecordTestUtils.replayAll(delta, topicRecords);
+
+LocalReplicaChanges changes = delta.localChanges(localId);
+assertEquals(
+new HashSet<>(
+

[GitHub] [kafka] rajinisivaram commented on a change in pull request #11221: KAFKA-13207: Don't partition state on fetch response with diverging epoch if partition removed from fetcher

2021-08-16 Thread GitBox


rajinisivaram commented on a change in pull request #11221:
URL: https://github.com/apache/kafka/pull/11221#discussion_r689860826



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -229,9 +229,16 @@ abstract class AbstractFetcherThread(name: String,
 }
   }
 
-  protected def truncateOnFetchResponse(epochEndOffsets: Map[TopicPartition, 
EpochEndOffset]): Unit = {
+  // Visibility for unit tests
+  protected[server] def truncateOnFetchResponse(epochEndOffsets: 
Map[TopicPartition, EpochEndOffset]): Unit = {
 inLock(partitionMapLock) {
-  val ResultWithPartitions(fetchOffsets, partitionsWithError) = 
maybeTruncateToEpochEndOffsets(epochEndOffsets, Map.empty)
+  // Partitions may have been removed from the fetcher while the thread 
was waiting for fetch
+  // response. Filter out removed partitions while holding 
`partitionMapLock` to ensure that we
+  // don't update state for any partition that may have already been 
migrated to another thread.
+  val filteredEpochEndOffsets = epochEndOffsets.filter { case (tp, _) =>
+partitionStates.contains(tp)
+  }

Review comment:
   @dajac Thanks for the review, updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned

2021-08-16 Thread GitBox


junrao commented on a change in pull request #11216:
URL: https://github.com/apache/kafka/pull/11216#discussion_r689841279



##
File path: metadata/src/test/java/org/apache/kafka/image/TopicsImageTest.java
##
@@ -126,6 +133,158 @@ private static TopicImage newTopicImage(String name, Uuid 
id, PartitionRegistrat
 IMAGE2 = new TopicsImage(newTopicsByIdMap(topics2), 
newTopicsByNameMap(topics2));
 }
 
+private PartitionRegistration newPartition(int[] replicas) {
+return new PartitionRegistration(replicas, replicas, Replicas.NONE, 
Replicas.NONE, replicas[0], 1, 1);
+}
+
+@Test
+public void testLocalReplicaChanges() {
+int localId = 3;
+Uuid newFooId = Uuid.fromString("0hHJ3X5ZQ-CFfQ5xgpj90w");
+
+List topics = new ArrayList<>(TOPIC_IMAGES1);
+topics.add(
+newTopicImage(
+"foo",
+newFooId,
+newPartition(new int[] {0, 1, 3}),
+newPartition(new int[] {3, 1, 2}),
+newPartition(new int[] {0, 1, 3}),
+newPartition(new int[] {3, 1, 2}),
+newPartition(new int[] {0, 1, 2}),
+newPartition(new int[] {0, 1, 2})
+)
+);
+
+TopicsImage image = new TopicsImage(newTopicsByIdMap(topics), 
newTopicsByNameMap(topics));
+
+List topicRecords = new 
ArrayList<>(DELTA1_RECORDS);
+// foo-0 - follower to leader
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(0)
+  .setLeader(3),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-1 - leader to follower
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(1)
+  .setLeader(1),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-2 - follower to removed
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(2)
+  .setIsr(Arrays.asList(0, 1, 2))
+  .setReplicas(Arrays.asList(0, 1, 2)),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-3 - leader to removed
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(3)
+  .setLeader(0)
+  .setIsr(Arrays.asList(0, 1, 2))
+  .setReplicas(Arrays.asList(0, 1, 2)),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-4 - not replica to leader
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(4)
+  .setLeader(3)
+  .setIsr(Arrays.asList(3, 1, 2))
+  .setReplicas(Arrays.asList(3, 1, 2)),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+// foo-5 - not replica to follower
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionChangeRecord()
+  .setTopicId(newFooId)
+  .setPartitionId(5)
+  .setIsr(Arrays.asList(0, 1, 3))
+  .setReplicas(Arrays.asList(0, 1, 3)),
+PARTITION_CHANGE_RECORD.highestSupportedVersion()
+)
+);
+
+/* Changes already include in DELTA1_RECORDS:
+ * foo - topic id deleted
+ * bar-0 - stay as follower with different partition epoch
+ * baz-0 - new topic to leader
+ */
+
+// baz-1 - new topic to follower
+topicRecords.add(
+new ApiMessageAndVersion(
+new PartitionRecord()
+.setPartitionId(1)
+.setTopicId(Uuid.fromString("tgHBnRglT5W_RlENnuG5vg"))
+.setReplicas(Arrays.asList(4, 2, 3))
+.setIsr(Arrays.asList(4, 2, 3))
+.setLeader(4)
+.setLeaderEpoch(2)
+.setPartitionEpoch(1),
+PARTITION_RECORD.highestSupportedVersion()
+)
+);
+
+TopicsDelta delta = new TopicsDelta(image);
+RecordTestUtils.replayAll(delta, topicRecords);
+
+LocalReplicaChanges changes = delta.localChanges(localId);
+assertEquals(
+new HashSet<>(
+

[jira] [Commented] (KAFKA-12713) Report "REAL" follower/consumer fetch latency

2021-08-16 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1730#comment-1730
 ] 

Ismael Juma commented on KAFKA-12713:
-

[~kaihuang] Thanks for the KIP. I suggest starting a discuss thread for KIP-736 
and we can discuss it further over there. :)

> Report "REAL" follower/consumer fetch latency
> -
>
> Key: KAFKA-12713
> URL: https://issues.apache.org/jira/browse/KAFKA-12713
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ming Liu
>Assignee: Kai Huang
>Priority: Major
>
> The fetch latency is an important metrics to monitor for the cluster 
> performance. With ACK=ALL, the produce latency is affected primarily by 
> broker fetch latency.
> However, currently the reported fetch latency didn't reflect the true fetch 
> latency because it sometimes need to stay in purgatory and wait for 
> replica.fetch.wait.max.ms when data is not available. This greatly affect the 
> real P50, P99 etc. 
> I like to propose a KIP to be able track the real fetch latency for both 
> broker follower and consumer. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13206) shutting down broker needs to stop fetching as a follower in KRaft mode

2021-08-16 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17399978#comment-17399978
 ] 

Jun Rao commented on KAFKA-13206:
-

It's probably after the controller has acted on the controlled shutdown request.

> shutting down broker needs to stop fetching as a follower in KRaft mode
> ---
>
> Key: KAFKA-13206
> URL: https://issues.apache.org/jira/browse/KAFKA-13206
> Project: Kafka
>  Issue Type: Bug
>  Components: core, kraft, replication
>Affects Versions: 3.0.0
>Reporter: Jun Rao
>Priority: Major
>  Labels: kip-500
>
> In the ZK mode, the controller will send a stopReplica(with deletion flag as 
> false) request to the shutting down broker so that it will stop the followers 
> from fetching. In KRaft mode, we don't have a corresponding logic. This means 
> unnecessary rejected fetch follower requests during controlled shutdown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13206) shutting down broker needs to stop fetching as a follower in KRaft mode

2021-08-16 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-13206:
---
Component/s: replication
 kraft

> shutting down broker needs to stop fetching as a follower in KRaft mode
> ---
>
> Key: KAFKA-13206
> URL: https://issues.apache.org/jira/browse/KAFKA-13206
> Project: Kafka
>  Issue Type: Bug
>  Components: core, kraft, replication
>Affects Versions: 3.0.0
>Reporter: Jun Rao
>Priority: Major
>  Labels: kip-500
>
> In the ZK mode, the controller will send a stopReplica(with deletion flag as 
> false) request to the shutting down broker so that it will stop the followers 
> from fetching. In KRaft mode, we don't have a corresponding logic. This means 
> unnecessary rejected fetch follower requests during controlled shutdown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13206) shutting down broker needs to stop fetching as a follower in KRaft mode

2021-08-16 Thread Jose Armando Garcia Sancio (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17399976#comment-17399976
 ] 

Jose Armando Garcia Sancio commented on KAFKA-13206:


Thanks for the issue [~junrao] . When is it safe to "stop" this replica without 
deleting? Is it when the {{ReplicaManager}} is {{isShuttingDown}} and the 
replica is a follower?

> shutting down broker needs to stop fetching as a follower in KRaft mode
> ---
>
> Key: KAFKA-13206
> URL: https://issues.apache.org/jira/browse/KAFKA-13206
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.0.0
>Reporter: Jun Rao
>Priority: Major
>  Labels: kip-500
>
> In the ZK mode, the controller will send a stopReplica(with deletion flag as 
> false) request to the shutting down broker so that it will stop the followers 
> from fetching. In KRaft mode, we don't have a corresponding logic. This means 
> unnecessary rejected fetch follower requests during controlled shutdown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rondagostino opened a new pull request #11222: MINOR: Test ReplicaManager MBean names

2021-08-16 Thread GitBox


rondagostino opened a new pull request #11222:
URL: https://github.com/apache/kafka/pull/11222


   This patch closes a testing gap by confirming that ReplicaManager metrics 
are exposed with the expected MBean names.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned

2021-08-16 Thread GitBox


jsancio commented on a change in pull request #11216:
URL: https://github.com/apache/kafka/pull/11216#discussion_r689787245



##
File path: metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
##
@@ -162,4 +163,39 @@ public boolean topicWasDeleted(String topicName) {
 public Set deletedTopicIds() {
 return deletedTopicIds;
 }
+
+/**
+ * Find the topic partitions that have change base on the replica given.
+ *
+ * The changes identified are:
+ *   1. topic partitions for which the broker is not a replica anymore
+ *   2. topic partitions for which the broker is now the leader
+ *   3. topic partitions for which the broker is now a follower
+ *
+ * @param brokerId the broker id
+ * @return the list of topic partitions which the broker should remove, 
become leader or become follower.
+ */
+public LocalReplicaChanges localChanges(int brokerId) {
+Set deletes = new HashSet<>();
+Map leaders = new 
HashMap<>();
+Map followers = new 
HashMap<>();
+
+for (TopicDelta delta : changedTopics.values()) {
+LocalReplicaChanges changes = delta.localChanges(brokerId);
+
+deletes.addAll(changes.deletes());
+leaders.putAll(changes.leaders());
+followers.putAll(changes.followers());
+}
+
+// Add all of the deleted topic partitions to the map of locally 
removed partitions
+deletedTopicIds().forEach(topicId -> {
+TopicImage topicImage = image().getTopic(topicId);
+topicImage.partitions().keySet().forEach(partitionId -> {
+deletes.add(new TopicPartition(topicImage.name(), 
partitionId));

Review comment:
   Yes. Fixed and updated the test in `TopicsImageTest` to check for this 
case.

##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -3020,34 +2949,183 @@ class ReplicaManagerTest {
 TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  @Test
+  def testDeltaFollowerToNotReplica(): Unit = {
+val localId = 1
+val otherId = localId + 1
+val topicPartition = new TopicPartition("foo", 0)
+val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), localId)
+
+try {
+  // Make the local replica the follower
+  val followerTopicsDelta = topicsCreateDelta(localId, false)
+  val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+  replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+
+  // Check the state of that partition and fetcher
+  val HostedPartition.Online(followerPartition) = 
replicaManager.getPartition(topicPartition)
+  assertFalse(followerPartition.isLeader)
+  assertEquals(0, followerPartition.getLeaderEpoch)
+
+  val fetcher = 
replicaManager.replicaFetcherManager.getFetcher(topicPartition)
+  assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), 
fetcher.map(_.sourceBroker))
+
+  // Apply changes that remove replica
+  val notReplicaTopicsDelta = 
topicsChangeDelta(followerMetadataImage.topics(), otherId, true)
+  val notReplicaMetadataImage = 
imageFromTopics(notReplicaTopicsDelta.apply())
+  replicaManager.applyDelta(notReplicaMetadataImage, notReplicaTopicsDelta)
+
+  // Check that the partition was removed
+  assertEquals(HostedPartition.None, 
replicaManager.getPartition(topicPartition))
+  assertEquals(None, 
replicaManager.replicaFetcherManager.getFetcher(topicPartition))
+  assertEquals(None, replicaManager.logManager.getLog(topicPartition))
+} finally {
+  replicaManager.shutdown()
+}
+
+TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+  }
+
+  @Test
+  def testDeltaFollowerRemovedTopic(): Unit = {
+val localId = 1
+val otherId = localId + 1
+val topicPartition = new TopicPartition("foo", 0)
+val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), localId)
+
+try {
+  // Make the local replica the follower
+  val followerTopicsDelta = topicsCreateDelta(localId, false)
+  val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+  replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+
+  // Check the state of that partition and fetcher
+  val HostedPartition.Online(followerPartition) = 
replicaManager.getPartition(topicPartition)
+  assertFalse(followerPartition.isLeader)
+  assertEquals(0, followerPartition.getLeaderEpoch)
+
+  val fetcher = 
replicaManager.replicaFetcherManager.getFetcher(topicPartition)
+  assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), 
fetcher.map(_.sourceBroker))
+
+  // Apply changes that remove topic and replica
+  val removeTopicsDelta = topicsDeleteDelta(followerMetadataImage.topics())
+  val removeMetadataImage = 

[GitHub] [kafka] cmccabe merged pull request #11219: MINOR: clarify assertion in handleListPartitionReassignmentsRequest

2021-08-16 Thread GitBox


cmccabe merged pull request #11219:
URL: https://github.com/apache/kafka/pull/11219


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-12713) Report "REAL" follower/consumer fetch latency

2021-08-16 Thread Kai Huang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17399940#comment-17399940
 ] 

Kai Huang edited comment on KAFKA-12713 at 8/16/21, 6:48 PM:
-

[~ijuma] I would like to follow up on this ticket, and continue the discussion. 
I replied to the 
[discussion|https://lists.apache.org/thread.html/r7f82dde9133bf9d3a8b688ca7ae02ad761c52e7b79212c9247b276c5%40%3Cdev.kafka.apache.org%3E]
 thread and illustrate how the fetch latency metric should work in 
[KIP-736|https://cwiki.apache.org/confluence/display/KAFKA/KIP-736%3A+Report+the+true+end+to+end+fetch+latency].
 Could you please take a look and see if that clarifies your question?


was (Author: kaihuang):
[~ijuma] I would like to follow up on this ticket, and continue the discussion. 
I replied to your 
[discussion|https://lists.apache.org/thread.html/r7f82dde9133bf9d3a8b688ca7ae02ad761c52e7b79212c9247b276c5%40%3Cdev.kafka.apache.org%3E]
 thread and illustrate how the fetch latency metric should work in 
[KIP-736|https://cwiki.apache.org/confluence/display/KAFKA/KIP-736%3A+Report+the+true+end+to+end+fetch+latency].
 Could you please take a look and see if that clarifies your question?

> Report "REAL" follower/consumer fetch latency
> -
>
> Key: KAFKA-12713
> URL: https://issues.apache.org/jira/browse/KAFKA-12713
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ming Liu
>Assignee: Kai Huang
>Priority: Major
>
> The fetch latency is an important metrics to monitor for the cluster 
> performance. With ACK=ALL, the produce latency is affected primarily by 
> broker fetch latency.
> However, currently the reported fetch latency didn't reflect the true fetch 
> latency because it sometimes need to stay in purgatory and wait for 
> replica.fetch.wait.max.ms when data is not available. This greatly affect the 
> real P50, P99 etc. 
> I like to propose a KIP to be able track the real fetch latency for both 
> broker follower and consumer. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12713) Report "REAL" follower/consumer fetch latency

2021-08-16 Thread Kai Huang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12713?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17399940#comment-17399940
 ] 

Kai Huang commented on KAFKA-12713:
---

[~ijuma] I would like to follow up on this ticket, and continue the discussion. 
I replied to your 
[discussion|https://lists.apache.org/thread.html/r7f82dde9133bf9d3a8b688ca7ae02ad761c52e7b79212c9247b276c5%40%3Cdev.kafka.apache.org%3E]
 thread and illustrate how the fetch latency metric should work in 
[KIP-736|https://cwiki.apache.org/confluence/display/KAFKA/KIP-736%3A+Report+the+true+end+to+end+fetch+latency].
 Could you please take a look and see if that clarifies your question?

> Report "REAL" follower/consumer fetch latency
> -
>
> Key: KAFKA-12713
> URL: https://issues.apache.org/jira/browse/KAFKA-12713
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ming Liu
>Assignee: Kai Huang
>Priority: Major
>
> The fetch latency is an important metrics to monitor for the cluster 
> performance. With ACK=ALL, the produce latency is affected primarily by 
> broker fetch latency.
> However, currently the reported fetch latency didn't reflect the true fetch 
> latency because it sometimes need to stay in purgatory and wait for 
> replica.fetch.wait.max.ms when data is not available. This greatly affect the 
> real P50, P99 etc. 
> I like to propose a KIP to be able track the real fetch latency for both 
> broker follower and consumer. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on a change in pull request #11221: KAFKA-13207: Don't partition state on fetch response with diverging epoch if partition removed from fetcher

2021-08-16 Thread GitBox


dajac commented on a change in pull request #11221:
URL: https://github.com/apache/kafka/pull/11221#discussion_r689770112



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -229,9 +229,16 @@ abstract class AbstractFetcherThread(name: String,
 }
   }
 
-  protected def truncateOnFetchResponse(epochEndOffsets: Map[TopicPartition, 
EpochEndOffset]): Unit = {
+  // Visibility for unit tests
+  protected[server] def truncateOnFetchResponse(epochEndOffsets: 
Map[TopicPartition, EpochEndOffset]): Unit = {
 inLock(partitionMapLock) {
-  val ResultWithPartitions(fetchOffsets, partitionsWithError) = 
maybeTruncateToEpochEndOffsets(epochEndOffsets, Map.empty)
+  // Partitions may have been removed from the fetcher while the thread 
was waiting for fetch
+  // response. Filter out removed partitions while holding 
`partitionMapLock` to ensure that we
+  // don't update state for any partition that may have already been 
migrated to another thread.
+  val filteredEpochEndOffsets = epochEndOffsets.filter { case (tp, _) =>
+partitionStates.contains(tp)
+  }

Review comment:
   Did you consider pushing this down into 
`maybeTruncateToEpochEndOffsets`? This would avoid creating an extra collection 
here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13195) StateSerde don't honor DeserializationExceptionHandler

2021-08-16 Thread Ludo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17399913#comment-17399913
 ] 

Ludo commented on KAFKA-13195:
--

Thanks for all the clarification, really appreciated.
I will go to the Transformer for now (I seen that I have power here later 
today), just need to apply everywhere (since I have a lot of Transformer in my 
app)


> StateSerde don't honor DeserializationExceptionHandler
> --
>
> Key: KAFKA-13195
> URL: https://issues.apache.org/jira/browse/KAFKA-13195
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Ludo
>Priority: Major
>
> Kafka streams allow to configure an 
> [DeserializationExceptionHandler|https://docs.confluent.io/platform/current/streams/faq.html#failure-and-exception-handling]
>  
> When you are using a StateStore most of message will be a copy of original 
> message in internal topic and mostly will use the same serializer if the 
> message is another type. 
> You can see 
> [here|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java#L159-L161]
>  that StateSerde is using the raw Deserializer and not honor the 
> {{StreamsConfig.}}{{DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG}}.
> Leading to crash the application (reaching the 
> {{setUncaughtExceptionHandler}} method).
> I think the state store must have the same behavior than the 
> {{RecordDeserializer}} and honor the DeserializationExceptionHandler.
>  
> Stacktrace (coming from kafka stream 2.6.1) :
>  
> {code:java}
> Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing 
> !Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing 
> !org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_14, processor=workertaskjoined-repartition-source, 
> topic=kestra_executor-workertaskjoined-repartition, partition=14, 
> offset=167500, 
> stacktrace=org.apache.kafka.common.errors.SerializationException: 
> com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize 
> value of type `io.kestra.plugin.gcp.bigquery.ExtractToGcs$Format` from String 
> "txt": not one of the values accepted for Enum class: 
> [NEWLINE_DELIMITED_JSON, AVRO, PARQUET, CSV] at [Source: 
> (byte[])"{[truncated 1270 bytes]; line: 1, column: 1187] (through 
> reference chain: 
> io.kestra.core.models.flows.Flow["tasks"]->java.util.ArrayList[1]->org.kestra.task.gcp.bigquery.ExtractToGcs["format"])
>  at 
> com.fasterxml.jackson.databind.exc.InvalidFormatException.from(InvalidFormatException.java:67)
>  at 
> com.fasterxml.jackson.databind.DeserializationContext.weirdStringException(DeserializationContext.java:1851)
>  at 
> com.fasterxml.jackson.databind.DeserializationContext.handleWeirdStringValue(DeserializationContext.java:1079)
>  at 
> com.fasterxml.jackson.databind.deser.std.EnumDeserializer._deserializeAltString(EnumDeserializer.java:327)
>  at 
> com.fasterxml.jackson.databind.deser.std.EnumDeserializer._fromString(EnumDeserializer.java:214)
>  at 
> com.fasterxml.jackson.databind.deser.std.EnumDeserializer.deserialize(EnumDeserializer.java:188)
>  at 
> com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:138)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:324)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:225)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:197)
>  at 
> com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:137)
>  at 
> com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:107)
>  at 
> com.fasterxml.jackson.databind.deser.AbstractDeserializer.deserializeWithType(AbstractDeserializer.java:263)
>  at 
> com.fasterxml.jackson.databind.deser.std.CollectionDeserializer._deserializeFromArray(CollectionDeserializer.java:357)
>  at 
> com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:244)
>  at 
> com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:28)
>  at 
> com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:542)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:565)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:449)
>  at 
> 

[jira] [Commented] (KAFKA-13206) shutting down broker needs to stop fetching as a follower in KRaft mode

2021-08-16 Thread Jun Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17399908#comment-17399908
 ] 

Jun Rao commented on KAFKA-13206:
-

[~cmccabe] mentioned the following. The broker being shut down in kraft 
controlled shutdown usually does continue trying to fetch, but that's because 
we shut down its metadata listener (that might be something we should not do, 
to optimize more...).

 

So, as long as it doesn't degrade rolling performance noticeably, this might 
not be an issue.

> shutting down broker needs to stop fetching as a follower in KRaft mode
> ---
>
> Key: KAFKA-13206
> URL: https://issues.apache.org/jira/browse/KAFKA-13206
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.0.0
>Reporter: Jun Rao
>Priority: Major
>  Labels: kip-500
>
> In the ZK mode, the controller will send a stopReplica(with deletion flag as 
> false) request to the shutting down broker so that it will stop the followers 
> from fetching. In KRaft mode, we don't have a corresponding logic. This means 
> unnecessary rejected fetch follower requests during controlled shutdown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13195) StateSerde don't honor DeserializationExceptionHandler

2021-08-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17399902#comment-17399902
 ] 

Matthias J. Sax commented on KAFKA-13195:
-

Your transformer is in the stacktrace:
{quote}io.kestra.runner.kafka.streams.FlowWithTriggerTransformer.transform(FlowWithTriggerTransformer.java:39)
 at {quote}
Thus, you should be able to catch `StreamsException` when you call 
`store.get()/put()`, inspect if it's a `SerializationException` and  react to 
it. If it's a different exception type, you would rethrow.
{quote} * Reset the application is also not an option, since the 
deserialization will failed on deserialization on my case even with 
reprocessing (still no idea of what plugins have done).{quote}
Why? For this case, you could use the deserialization handler to skip poison 
pills.
{quote}In a real world, stream application must never crashed, and give the 
control to handle invalid data on state store (skip, dead letter queue, delete 
from state store ...) is a real need in my opinion.
{quote}
I agree in general, but I (personally) don't think that applying a handler to 
the state store is the right solution. We can keep this ticket open though, to 
see if there is demand for such a feature.

> StateSerde don't honor DeserializationExceptionHandler
> --
>
> Key: KAFKA-13195
> URL: https://issues.apache.org/jira/browse/KAFKA-13195
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Ludo
>Priority: Major
>
> Kafka streams allow to configure an 
> [DeserializationExceptionHandler|https://docs.confluent.io/platform/current/streams/faq.html#failure-and-exception-handling]
>  
> When you are using a StateStore most of message will be a copy of original 
> message in internal topic and mostly will use the same serializer if the 
> message is another type. 
> You can see 
> [here|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java#L159-L161]
>  that StateSerde is using the raw Deserializer and not honor the 
> {{StreamsConfig.}}{{DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG}}.
> Leading to crash the application (reaching the 
> {{setUncaughtExceptionHandler}} method).
> I think the state store must have the same behavior than the 
> {{RecordDeserializer}} and honor the DeserializationExceptionHandler.
>  
> Stacktrace (coming from kafka stream 2.6.1) :
>  
> {code:java}
> Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing 
> !Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing 
> !org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_14, processor=workertaskjoined-repartition-source, 
> topic=kestra_executor-workertaskjoined-repartition, partition=14, 
> offset=167500, 
> stacktrace=org.apache.kafka.common.errors.SerializationException: 
> com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize 
> value of type `io.kestra.plugin.gcp.bigquery.ExtractToGcs$Format` from String 
> "txt": not one of the values accepted for Enum class: 
> [NEWLINE_DELIMITED_JSON, AVRO, PARQUET, CSV] at [Source: 
> (byte[])"{[truncated 1270 bytes]; line: 1, column: 1187] (through 
> reference chain: 
> io.kestra.core.models.flows.Flow["tasks"]->java.util.ArrayList[1]->org.kestra.task.gcp.bigquery.ExtractToGcs["format"])
>  at 
> com.fasterxml.jackson.databind.exc.InvalidFormatException.from(InvalidFormatException.java:67)
>  at 
> com.fasterxml.jackson.databind.DeserializationContext.weirdStringException(DeserializationContext.java:1851)
>  at 
> com.fasterxml.jackson.databind.DeserializationContext.handleWeirdStringValue(DeserializationContext.java:1079)
>  at 
> com.fasterxml.jackson.databind.deser.std.EnumDeserializer._deserializeAltString(EnumDeserializer.java:327)
>  at 
> com.fasterxml.jackson.databind.deser.std.EnumDeserializer._fromString(EnumDeserializer.java:214)
>  at 
> com.fasterxml.jackson.databind.deser.std.EnumDeserializer.deserialize(EnumDeserializer.java:188)
>  at 
> com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:138)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:324)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:225)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:197)
>  at 
> com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:137)
>  at 
> com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:107)
>  

[jira] [Updated] (KAFKA-13195) StateSerde don't honor DeserializationExceptionHandler

2021-08-16 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13195?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-13195:

Issue Type: Improvement  (was: Bug)

> StateSerde don't honor DeserializationExceptionHandler
> --
>
> Key: KAFKA-13195
> URL: https://issues.apache.org/jira/browse/KAFKA-13195
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Ludo
>Priority: Major
>
> Kafka streams allow to configure an 
> [DeserializationExceptionHandler|https://docs.confluent.io/platform/current/streams/faq.html#failure-and-exception-handling]
>  
> When you are using a StateStore most of message will be a copy of original 
> message in internal topic and mostly will use the same serializer if the 
> message is another type. 
> You can see 
> [here|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java#L159-L161]
>  that StateSerde is using the raw Deserializer and not honor the 
> {{StreamsConfig.}}{{DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG}}.
> Leading to crash the application (reaching the 
> {{setUncaughtExceptionHandler}} method).
> I think the state store must have the same behavior than the 
> {{RecordDeserializer}} and honor the DeserializationExceptionHandler.
>  
> Stacktrace (coming from kafka stream 2.6.1) :
>  
> {code:java}
> Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing 
> !Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing 
> !org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_14, processor=workertaskjoined-repartition-source, 
> topic=kestra_executor-workertaskjoined-repartition, partition=14, 
> offset=167500, 
> stacktrace=org.apache.kafka.common.errors.SerializationException: 
> com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize 
> value of type `io.kestra.plugin.gcp.bigquery.ExtractToGcs$Format` from String 
> "txt": not one of the values accepted for Enum class: 
> [NEWLINE_DELIMITED_JSON, AVRO, PARQUET, CSV] at [Source: 
> (byte[])"{[truncated 1270 bytes]; line: 1, column: 1187] (through 
> reference chain: 
> io.kestra.core.models.flows.Flow["tasks"]->java.util.ArrayList[1]->org.kestra.task.gcp.bigquery.ExtractToGcs["format"])
>  at 
> com.fasterxml.jackson.databind.exc.InvalidFormatException.from(InvalidFormatException.java:67)
>  at 
> com.fasterxml.jackson.databind.DeserializationContext.weirdStringException(DeserializationContext.java:1851)
>  at 
> com.fasterxml.jackson.databind.DeserializationContext.handleWeirdStringValue(DeserializationContext.java:1079)
>  at 
> com.fasterxml.jackson.databind.deser.std.EnumDeserializer._deserializeAltString(EnumDeserializer.java:327)
>  at 
> com.fasterxml.jackson.databind.deser.std.EnumDeserializer._fromString(EnumDeserializer.java:214)
>  at 
> com.fasterxml.jackson.databind.deser.std.EnumDeserializer.deserialize(EnumDeserializer.java:188)
>  at 
> com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:138)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:324)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeOther(BeanDeserializer.java:225)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:197)
>  at 
> com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer._deserializeTypedForId(AsPropertyTypeDeserializer.java:137)
>  at 
> com.fasterxml.jackson.databind.jsontype.impl.AsPropertyTypeDeserializer.deserializeTypedFromObject(AsPropertyTypeDeserializer.java:107)
>  at 
> com.fasterxml.jackson.databind.deser.AbstractDeserializer.deserializeWithType(AbstractDeserializer.java:263)
>  at 
> com.fasterxml.jackson.databind.deser.std.CollectionDeserializer._deserializeFromArray(CollectionDeserializer.java:357)
>  at 
> com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:244)
>  at 
> com.fasterxml.jackson.databind.deser.std.CollectionDeserializer.deserialize(CollectionDeserializer.java:28)
>  at 
> com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:542)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:565)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:449)
>  at 
> com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1405)
>  at 
> 

[GitHub] [kafka] rajinisivaram opened a new pull request #11221: KAFKA-13207: Don't partition state on fetch response with diverging epoch if partition removed from fetcher

2021-08-16 Thread GitBox


rajinisivaram opened a new pull request #11221:
URL: https://github.com/apache/kafka/pull/11221


   `AbstractFetcherThread#truncateOnFetchResponse` is used with IBP 2.7 and 
above to truncate partitions based on diverging epoch returned in fetch 
responses. Truncation should only be performed for partitions that are still 
owned by the fetcher and this check should be done while holding 
`partitionMapLock` to ensure that any partitions removed from the fetcher 
thread are not truncated. The PR adds this check. Truncation will be performed 
by any new fetcher that owns the partition when it restarts fetching.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13207) Replica fetcher should not update partition state on diverging epoch if partition removed from fetcher

2021-08-16 Thread Rajini Sivaram (Jira)
Rajini Sivaram created KAFKA-13207:
--

 Summary: Replica fetcher should not update partition state on 
diverging epoch if partition removed from fetcher
 Key: KAFKA-13207
 URL: https://issues.apache.org/jira/browse/KAFKA-13207
 Project: Kafka
  Issue Type: New Feature
  Components: core
Affects Versions: 2.8.0
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 3.0.0, 2.8.1


{{AbstractFetcherThread#truncateOnFetchResponse}}{color:#24292e} is used with 
IBP 2.7 and above to truncate partitions based on diverging epoch returned in 
fetch responses. Truncation should only be performed for partitions that are 
still owned by the fetcher and this check should be done while holding 
{color}{{partitionMapLock}}{color:#24292e} to ensure that any partitions 
removed from the fetcher thread are not truncated{color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13206) shutting down broker needs to stop fetching as a follower in KRaft mode

2021-08-16 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-13206:

Labels: kip-500  (was: )

> shutting down broker needs to stop fetching as a follower in KRaft mode
> ---
>
> Key: KAFKA-13206
> URL: https://issues.apache.org/jira/browse/KAFKA-13206
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 3.0.0
>Reporter: Jun Rao
>Priority: Major
>  Labels: kip-500
>
> In the ZK mode, the controller will send a stopReplica(with deletion flag as 
> false) request to the shutting down broker so that it will stop the followers 
> from fetching. In KRaft mode, we don't have a corresponding logic. This means 
> unnecessary rejected fetch follower requests during controlled shutdown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13206) shutting down broker needs to stop fetching as a follower in KRaft mode

2021-08-16 Thread Jun Rao (Jira)
Jun Rao created KAFKA-13206:
---

 Summary: shutting down broker needs to stop fetching as a follower 
in KRaft mode
 Key: KAFKA-13206
 URL: https://issues.apache.org/jira/browse/KAFKA-13206
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 3.0.0
Reporter: Jun Rao


In the ZK mode, the controller will send a stopReplica(with deletion flag as 
false) request to the shutting down broker so that it will stop the followers 
from fetching. In KRaft mode, we don't have a corresponding logic. This means 
unnecessary rejected fetch follower requests during controlled shutdown.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #11216: KAFKA-13198: Stop replicas when reassigned

2021-08-16 Thread GitBox


junrao commented on a change in pull request #11216:
URL: https://github.com/apache/kafka/pull/11216#discussion_r689659905



##
File path: metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
##
@@ -162,4 +163,39 @@ public boolean topicWasDeleted(String topicName) {
 public Set deletedTopicIds() {
 return deletedTopicIds;
 }
+
+/**
+ * Find the topic partitions that have change base on the replica given.

Review comment:
   base => based

##
File path: metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
##
@@ -162,4 +163,39 @@ public boolean topicWasDeleted(String topicName) {
 public Set deletedTopicIds() {
 return deletedTopicIds;
 }
+
+/**
+ * Find the topic partitions that have change base on the replica given.
+ *
+ * The changes identified are:
+ *   1. topic partitions for which the broker is not a replica anymore
+ *   2. topic partitions for which the broker is now the leader
+ *   3. topic partitions for which the broker is now a follower
+ *
+ * @param brokerId the broker id
+ * @return the list of topic partitions which the broker should remove, 
become leader or become follower.
+ */
+public LocalReplicaChanges localChanges(int brokerId) {
+Set deletes = new HashSet<>();
+Map leaders = new 
HashMap<>();
+Map followers = new 
HashMap<>();
+
+for (TopicDelta delta : changedTopics.values()) {
+LocalReplicaChanges changes = delta.localChanges(brokerId);
+
+deletes.addAll(changes.deletes());
+leaders.putAll(changes.leaders());
+followers.putAll(changes.followers());
+}
+
+// Add all of the deleted topic partitions to the map of locally 
removed partitions
+deletedTopicIds().forEach(topicId -> {
+TopicImage topicImage = image().getTopic(topicId);
+topicImage.partitions().keySet().forEach(partitionId -> {
+deletes.add(new TopicPartition(topicImage.name(), 
partitionId));

Review comment:
   Should we further check that the deleted partition has a replica on 
brokerId before adding it to deletes?

##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -3020,34 +2949,183 @@ class ReplicaManagerTest {
 TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  @Test
+  def testDeltaFollowerToNotReplica(): Unit = {
+val localId = 1
+val otherId = localId + 1
+val topicPartition = new TopicPartition("foo", 0)
+val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), localId)
+
+try {
+  // Make the local replica the follower
+  val followerTopicsDelta = topicsCreateDelta(localId, false)
+  val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+  replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+
+  // Check the state of that partition and fetcher
+  val HostedPartition.Online(followerPartition) = 
replicaManager.getPartition(topicPartition)
+  assertFalse(followerPartition.isLeader)
+  assertEquals(0, followerPartition.getLeaderEpoch)
+
+  val fetcher = 
replicaManager.replicaFetcherManager.getFetcher(topicPartition)
+  assertEquals(Some(BrokerEndPoint(otherId, "localhost", 9093)), 
fetcher.map(_.sourceBroker))
+
+  // Apply changes that remove replica
+  val notReplicaTopicsDelta = 
topicsChangeDelta(followerMetadataImage.topics(), otherId, true)
+  val notReplicaMetadataImage = 
imageFromTopics(notReplicaTopicsDelta.apply())
+  replicaManager.applyDelta(notReplicaMetadataImage, notReplicaTopicsDelta)
+
+  // Check that the partition was removed
+  assertEquals(HostedPartition.None, 
replicaManager.getPartition(topicPartition))
+  assertEquals(None, 
replicaManager.replicaFetcherManager.getFetcher(topicPartition))
+  assertEquals(None, replicaManager.logManager.getLog(topicPartition))
+} finally {
+  replicaManager.shutdown()
+}
+
+TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+  }
+
+  @Test
+  def testDeltaFollowerRemovedTopic(): Unit = {
+val localId = 1
+val otherId = localId + 1
+val topicPartition = new TopicPartition("foo", 0)
+val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), localId)
+
+try {
+  // Make the local replica the follower
+  val followerTopicsDelta = topicsCreateDelta(localId, false)
+  val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+  replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+
+  // Check the state of that partition and fetcher
+  val HostedPartition.Online(followerPartition) = 
replicaManager.getPartition(topicPartition)
+  assertFalse(followerPartition.isLeader)
+  assertEquals(0, 

[GitHub] [kafka] OmniaGM opened a new pull request #11220: KAFKA-10777: Add additional configuration to control MirrorMaker 2 internal topics naming convention

2021-08-16 Thread GitBox


OmniaGM opened a new pull request #11220:
URL: https://github.com/apache/kafka/pull/11220


   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #11219: MINOR: Forward list partition reassignments

2021-08-16 Thread GitBox


rondagostino commented on pull request #11219:
URL: https://github.com/apache/kafka/pull/11219#issuecomment-899637052


   Actually, I see now that this is simply fixing the error message.  LGTM 
as-is.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] xdgrulez commented on pull request #10897: MINOR: Reduced severity for "skipping records" falling out of time windows

2021-08-16 Thread GitBox


xdgrulez commented on pull request #10897:
URL: https://github.com/apache/kafka/pull/10897#issuecomment-899621818


   Hi,
   
   I've fixed the tests now - on my local machine, the complete :streams:test 
suite is now working perfectly fine again :)
   
   Best regards,
   Ralph
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #11219: MINOR: Forward list partition reassignments

2021-08-16 Thread GitBox


rondagostino commented on pull request #11219:
URL: https://github.com/apache/kafka/pull/11219#issuecomment-899567795


   Good catch.  I wonder if this should be fixed for 3.0 -- i.e. should it be a 
blocker?  Also, is there a way to close the testing gap that allowed this to 
exist -- maybe have `reassign_partitions_test.py` list the partitions 
reassignments at least one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10643) Static membership - repetitive PreparingRebalance with updating metadata for member reason

2021-08-16 Thread Eran Levy (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17399752#comment-17399752
 ] 

Eran Levy commented on KAFKA-10643:
---

Actually I think that we never solved it but it reduced alot for some reason. I 
cant point my finger on any specific reason that resolved it.

We have done some dns changes that might resolved a network issue but Im doubt 
that it was the issue. 

> Static membership - repetitive PreparingRebalance with updating metadata for 
> member reason
> --
>
> Key: KAFKA-10643
> URL: https://issues.apache.org/jira/browse/KAFKA-10643
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Eran Levy
>Priority: Major
> Attachments: broker-4-11.csv, client-4-11.csv, 
> client-d-9-11-11-2020.csv
>
>
> Kafka streams 2.6.0, brokers version 2.6.0. Kafka nodes are healthy, kafka 
> streams app is healthy. 
> Configured with static membership. 
> Every 10 minutes (I assume cause of topic.metadata.refresh.interval.ms), I 
> see the following group coordinator log for different stream consumers: 
> INFO [GroupCoordinator 2]: Preparing to rebalance group **--**-stream in 
> state PreparingRebalance with old generation 12244 (__consumer_offsets-45) 
> (reason: Updating metadata for member 
> -stream-11-1-013edd56-ed93-4370-b07c-1c29fbe72c9a) 
> (kafka.coordinator.group.GroupCoordinator)
> and right after that the following log: 
> INFO [GroupCoordinator 2]: Assignment received from leader for group 
> **-**-stream for generation 12246 (kafka.coordinator.group.GroupCoordinator)
>  
> Looked a bit on the kafka code and Im not sure that I get why such a thing 
> happening - is this line described the situation that happens here re the 
> "reason:"?[https://github.com/apache/kafka/blob/7ca299b8c0f2f3256c40b694078e422350c20d19/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L311]
> I also dont see it happening too often in other kafka streams applications 
> that we have. 
> The only thing suspicious that I see around every hour that different pods of 
> that kafka streams application throw this exception: 
> {"timestamp":"2020-10-25T06:44:20.414Z","level":"INFO","thread":"**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1","logger":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer
>  
> clientId=**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1-restore-consumer,
>  groupId=null] Error sending fetch request (sessionId=34683236, epoch=2872) 
> to node 
> 3:","context":"default","exception":"org.apache.kafka.common.errors.DisconnectException:
>  null\n"}
> I came across this strange behaviour after stated to investigate a strange 
> stuck rebalancing state after one of the members left the group and caused 
> the rebalance to stuck - the only thing that I found is that maybe because 
> that too often preparing to rebalance states, the app might affected of this 
> bug - KAFKA-9752 ?
> I dont understand why it happens, it wasn't before I applied static 
> membership to that kafka streams application (since around 2 weeks ago). 
> Will be happy if you can help me
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dengziming commented on pull request #11219: MINOR: Forward list partition reassignments

2021-08-16 Thread GitBox


dengziming commented on pull request #11219:
URL: https://github.com/apache/kafka/pull/11219#issuecomment-899490789


   ping @rondagostino @cmccabe 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming opened a new pull request #11219: MINOR: Forward list partition reassignments

2021-08-16 Thread GitBox


dengziming opened a new pull request #11219:
URL: https://github.com/apache/kafka/pull/11219


   *More detailed description of your change*
   We already support list partition reassignments in KRAFT mode, so change 
`notYetSupported` to `shouldAlwaysForward`
   
   *Summary of testing strategy (including rationale)*
   Unit test
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13205) Clarify API specification of Kafka Connect endpoint

2021-08-16 Thread Jonathan Kaleve (Jira)
Jonathan Kaleve created KAFKA-13205:
---

 Summary: Clarify API specification of Kafka Connect endpoint
 Key: KAFKA-13205
 URL: https://issues.apache.org/jira/browse/KAFKA-13205
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 2.7.0
Reporter: Jonathan Kaleve


Since Version 2.5, Kafka Connect exposes an Endpoint for getting all topics 
related to a Connector via its REST API (see [the original 
KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-558%3A+Track+the+set+of+actively+used+topics+by+connectors+in+Kafka+Connect]).


While the original KIP proposed the Response Payload to look as follows: 
{code:java}
{
  "some-source": {
"topics": [
  "foo",
  "bar",
  "baz",   
]
  }
}
{code}
{{}}

The documentation by Confluent states the same: 
[https://docs.confluent.io/platform/current/connect/references/restapi.html#get--connectors-(string-name)-topics]


The [actual 
Code|https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java#L198],
 however, produces a result of the following form:
{code:java}
{
  "some-source": {
"connector": "some-source",
"topics": [
  "foo",
  "bar",
  "baz",   
]
  }
}
{code}
 
This poses a problem to some Applications (like [Strimzi 
Operator|https://github.com/strimzi/strimzi-kafka-operator]) since they expect 
the response to be of type {{Map>>}} (see 
[here|https://github.com/strimzi/strimzi-kafka-operator/blob/61a2301390fb9ecc87feb1925d0c2d2f1b2f8107/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectApiImpl.java#L610]),
 but in Kafka Connect, the return type is actually {{Map>}} (see [this 
test|https://github.com/apache/kafka/blob/2.7/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java#L884],
 for example)
Which type should be expected here? Is the endpoint intended to return the type 
documented by Confluent, or the type that is actually present in the code? 

Also: I might be overlooking something, but it seems there is no official 
documentation of the specific API endpoints of Kafka Connect. Is that correct?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13205) Clarify API specification of Kafka Connect endpoint

2021-08-16 Thread Jonathan Kaleve (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Kaleve updated KAFKA-13205:

Description: 
Since Version 2.5, Kafka Connect exposes an Endpoint for getting all topics 
related to a Connector via its REST API (see [the original 
KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-558%3A+Track+the+set+of+actively+used+topics+by+connectors+in+Kafka+Connect]).

While the original KIP proposed the Response Payload to look as follows: 
{code:java}
{
  "some-source": {
"topics": [
  "foo",
  "bar",
  "baz",   
]
  }
}
{code}
The documentation by Confluent states the same: 
[https://docs.confluent.io/platform/current/connect/references/restapi.html#get--connectors-(string-name)-topics]

The [actual 
Code|https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java#L198],
 however, produces a result of the following form:
{code:java}
{
  "some-source": {
"connector": "some-source",
"topics": [
  "foo",
  "bar",
  "baz",   
]
  }
}
{code}
 
 This poses a problem to some Applications (like [Strimzi 
Operator|https://github.com/strimzi/strimzi-kafka-operator]) since they expect 
the response to be of type {{Map>>}} (see 
[here|https://github.com/strimzi/strimzi-kafka-operator/blob/61a2301390fb9ecc87feb1925d0c2d2f1b2f8107/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectApiImpl.java#L610]),
 but in Kafka Connect, the return type is actually {{Map>}} (see [this 
test|https://github.com/apache/kafka/blob/2.7/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java#L884],
 for example)
 Which type should be expected here? Is the endpoint intended to return the 
type documented by Confluent, or the type that is actually present in the code? 

Also: I might be overlooking something, but it seems there is no official 
documentation of the specific API endpoints of Kafka Connect. Is that correct?

  was:
Since Version 2.5, Kafka Connect exposes an Endpoint for getting all topics 
related to a Connector via its REST API (see [the original 
KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-558%3A+Track+the+set+of+actively+used+topics+by+connectors+in+Kafka+Connect]).


While the original KIP proposed the Response Payload to look as follows: 
{code:java}
{
  "some-source": {
"topics": [
  "foo",
  "bar",
  "baz",   
]
  }
}
{code}
{{}}

The documentation by Confluent states the same: 
[https://docs.confluent.io/platform/current/connect/references/restapi.html#get--connectors-(string-name)-topics]


The [actual 
Code|https://github.com/apache/kafka/blob/2.7/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java#L198],
 however, produces a result of the following form:
{code:java}
{
  "some-source": {
"connector": "some-source",
"topics": [
  "foo",
  "bar",
  "baz",   
]
  }
}
{code}
 
This poses a problem to some Applications (like [Strimzi 
Operator|https://github.com/strimzi/strimzi-kafka-operator]) since they expect 
the response to be of type {{Map>>}} (see 
[here|https://github.com/strimzi/strimzi-kafka-operator/blob/61a2301390fb9ecc87feb1925d0c2d2f1b2f8107/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/KafkaConnectApiImpl.java#L610]),
 but in Kafka Connect, the return type is actually {{Map>}} (see [this 
test|https://github.com/apache/kafka/blob/2.7/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java#L884],
 for example)
Which type should be expected here? Is the endpoint intended to return the type 
documented by Confluent, or the type that is actually present in the code? 

Also: I might be overlooking something, but it seems there is no official 
documentation of the specific API endpoints of Kafka Connect. Is that correct?


> Clarify API specification of Kafka Connect endpoint
> ---
>
> Key: KAFKA-13205
> URL: https://issues.apache.org/jira/browse/KAFKA-13205
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.7.0
>Reporter: Jonathan Kaleve
>Priority: Major
>
> Since Version 2.5, Kafka Connect exposes an Endpoint for getting all topics 
> related to a Connector via its REST API (see [the original 
> KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-558%3A+Track+the+set+of+actively+used+topics+by+connectors+in+Kafka+Connect]).
> While the original KIP proposed the Response Payload to look as follows: 
> {code:java}
> {
>   "some-source": {
> "topics": [
>  

[jira] [Comment Edited] (KAFKA-10643) Static membership - repetitive PreparingRebalance with updating metadata for member reason

2021-08-16 Thread Maatari (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10643?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17399346#comment-17399346
 ] 

Maatari edited comment on KAFKA-10643 at 8/16/21, 11:44 AM:


Hi ([~cadonna] [~ableegoldman] [~eran-levy] ), I wonder if there is any news on 
this. I'm having the same issue. I'm using kafka 2.8 (confluent 6.2.0 more 
specifically). I am using static membership. I am running into the same exact 
issue. Although my session.timeout.ms and max.poll.record.ms are both set to an 
hour, before i hit any of those timeout, i am always experiencing, a rebalance 
kicking in, because of either a leader re-joining or a metadata update. In each 
case, once the rebalance start, every member are first removed and re-join, but 
then the group never properly stabilize. Although somehow, my kafka stream 
application end up progressing, but never at full capacity. I never see in the 
log of the coordinator, the group being stable again, however, i can see member 
rejoining intermittently, and in the mean time the application progresses. 

Generally speaking, what is the status of this ? is it being worked on, is 
there any workaround ? [~eran-levy] how did you manage to work around or solve 
this ?


was (Author: maatdeamon):
Hi, I wonder if there is any news on this. I'm having the same issue. I'm using 
kafka 2.8 (confluent 6.2.0 more specifically). I am using static membership. I 
am running into the same exact issue. Although my session.timeout.ms and 
max.poll.record.ms are both set to an hour, before i hit any of those timeout, 
i am always experiencing, a rebalance kicking in, because of either a leader 
re-joining or a metadata update. In each case, once the rebalance start, every 
member are first removed and re-join, but then the group never properly 
stabilize. Although somehow, my kafka stream application end up progressing, 
but never at full capacity. I never see in the log of the coordinator, the 
group being stable again, however, i can see member rejoining intermittently, 
and in the mean time the application progresses. 

Generally speaking, what is the status of this ? is it being worked on, is 
there any workaround ? [~eran-levy] how did you manage to work around or solve 
this ?

> Static membership - repetitive PreparingRebalance with updating metadata for 
> member reason
> --
>
> Key: KAFKA-10643
> URL: https://issues.apache.org/jira/browse/KAFKA-10643
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Eran Levy
>Priority: Major
> Attachments: broker-4-11.csv, client-4-11.csv, 
> client-d-9-11-11-2020.csv
>
>
> Kafka streams 2.6.0, brokers version 2.6.0. Kafka nodes are healthy, kafka 
> streams app is healthy. 
> Configured with static membership. 
> Every 10 minutes (I assume cause of topic.metadata.refresh.interval.ms), I 
> see the following group coordinator log for different stream consumers: 
> INFO [GroupCoordinator 2]: Preparing to rebalance group **--**-stream in 
> state PreparingRebalance with old generation 12244 (__consumer_offsets-45) 
> (reason: Updating metadata for member 
> -stream-11-1-013edd56-ed93-4370-b07c-1c29fbe72c9a) 
> (kafka.coordinator.group.GroupCoordinator)
> and right after that the following log: 
> INFO [GroupCoordinator 2]: Assignment received from leader for group 
> **-**-stream for generation 12246 (kafka.coordinator.group.GroupCoordinator)
>  
> Looked a bit on the kafka code and Im not sure that I get why such a thing 
> happening - is this line described the situation that happens here re the 
> "reason:"?[https://github.com/apache/kafka/blob/7ca299b8c0f2f3256c40b694078e422350c20d19/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L311]
> I also dont see it happening too often in other kafka streams applications 
> that we have. 
> The only thing suspicious that I see around every hour that different pods of 
> that kafka streams application throw this exception: 
> {"timestamp":"2020-10-25T06:44:20.414Z","level":"INFO","thread":"**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1","logger":"org.apache.kafka.clients.FetchSessionHandler","message":"[Consumer
>  
> clientId=**-**-stream-94561945-4191-4a07-ac1b-07b27e044402-StreamThread-1-restore-consumer,
>  groupId=null] Error sending fetch request (sessionId=34683236, epoch=2872) 
> to node 
> 3:","context":"default","exception":"org.apache.kafka.common.errors.DisconnectException:
>  null\n"}
> I came across this strange behaviour after stated to investigate a strange 
> stuck rebalancing state after one of the members left the group and caused 
> the rebalance to stuck - the only thing that I found is that maybe because 
> that too 

[GitHub] [kafka] mimaison commented on pull request #11212: KAFKA-13200: Fix MirrorMaker2 connector version

2021-08-16 Thread GitBox


mimaison commented on pull request #11212:
URL: https://github.com/apache/kafka/pull/11212#issuecomment-899357757


   @showuon Good point, updated!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11218: MINOR: optimize performAssignment to skip unnecessary check

2021-08-16 Thread GitBox


showuon commented on pull request #11218:
URL: https://github.com/apache/kafka/pull/11218#issuecomment-899357307


   @guozhangwang @ableegoldman , please take a look when available. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #11218: MINOR: optimize performAssignment to skip unnecessary check

2021-08-16 Thread GitBox


showuon commented on a change in pull request #11218:
URL: https://github.com/apache/kafka/pull/11218#discussion_r689370424



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
##
@@ -597,28 +603,32 @@ private void updateGroupSubscription(Set topics) {
 // own metadata with the newly added topics so that it will not 
trigger a subsequent rebalance
 // when these topics gets updated from metadata refresh.
 //
+// We skip the check for in-product assignors since this will not 
happen in in-product assignors.
+//
 // TODO: this is a hack and not something we want to support long-term 
unless we push regex into the protocol
 //   we may need to modify the ConsumerPartitionAssignor API to 
better support this case.
-Set assignedTopics = new HashSet<>();
-for (Assignment assigned : assignments.values()) {
-for (TopicPartition tp : assigned.partitions())
-assignedTopics.add(tp.topic());
-}
+if (!isInProductAssignor(assignor.name())) {
+Set assignedTopics = new HashSet<>();
+for (Assignment assigned : assignments.values()) {
+for (TopicPartition tp : assigned.partitions())
+assignedTopics.add(tp.topic());
+}
 
-if (!assignedTopics.containsAll(allSubscribedTopics)) {
-Set notAssignedTopics = new HashSet<>(allSubscribedTopics);
-notAssignedTopics.removeAll(assignedTopics);
-log.warn("The following subscribed topics are not assigned to any 
members: {} ", notAssignedTopics);
-}
+if (!assignedTopics.containsAll(allSubscribedTopics)) {
+Set notAssignedTopics = new 
HashSet<>(allSubscribedTopics);
+notAssignedTopics.removeAll(assignedTopics);
+log.warn("The following subscribed topics are not assigned to 
any members: {} ", notAssignedTopics);
+}
 
-if (!allSubscribedTopics.containsAll(assignedTopics)) {
-Set newlyAddedTopics = new HashSet<>(assignedTopics);
-newlyAddedTopics.removeAll(allSubscribedTopics);
-log.info("The following not-subscribed topics are assigned, and 
their metadata will be " +
+if (!allSubscribedTopics.containsAll(assignedTopics)) {
+Set newlyAddedTopics = new HashSet<>(assignedTopics);
+newlyAddedTopics.removeAll(allSubscribedTopics);
+log.info("The following not-subscribed topics are assigned, 
and their metadata will be " +
 "fetched from the brokers: {}", newlyAddedTopics);
 
-allSubscribedTopics.addAll(assignedTopics);
-updateGroupSubscription(allSubscribedTopics);
+allSubscribedTopics.addAll(newlyAddedTopics);

Review comment:
   side fix: We can just add the `newlyAddedTopics` here because we already 
computed the it by `assignedTopics - allSubscribedTopics`. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #11218: MINOR: optimize performAssignment to skip unnecessary check

2021-08-16 Thread GitBox


showuon opened a new pull request #11218:
URL: https://github.com/apache/kafka/pull/11218


   Found this while reading the code. We did a "a little heavy" check each time 
after performing assignment, which is to compare the "assigned topics" set and 
the "subscribed topics" set, to see if there's any topics not existed in 
another set. Also, the "assigned topics" set is created by traversing all the 
assigned partitions, which will be a little heavy if partition numbers are 
large. 
   
   However, as the comments described, it's a safe-guard for user-customized 
assignor, which might do assignment that we don't expected. In most cases, user 
will just use the in-product assignor, which guarantee that we only assign the 
topics from subscribed topics. Therefore, no need this check for in-product 
assignors.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #11217: KAFKA-13204: assignor name conflict check

2021-08-16 Thread GitBox


showuon commented on pull request #11217:
URL: https://github.com/apache/kafka/pull/11217#issuecomment-899322956


   @ableegoldman , please take a look when available. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #11217: KAFKA-13204: assignor name conflict check

2021-08-16 Thread GitBox


showuon opened a new pull request #11217:
URL: https://github.com/apache/kafka/pull/11217


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13081) Port sticky assignor fixes (KAFKA-12984) back to 2.8

2021-08-16 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17399588#comment-17399588
 ] 

Luke Chen commented on KAFKA-13081:
---

My PR is ready for [~ableegoldman] 's review, I think this PR should be fast 
since it's just a back port PR.

> Port sticky assignor fixes (KAFKA-12984) back to 2.8
> 
>
> Key: KAFKA-13081
> URL: https://issues.apache.org/jira/browse/KAFKA-13081
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: Luke Chen
>Priority: Blocker
> Fix For: 2.8.1
>
>
> We should make sure that fix #1 and #2 of 
> [#10985|https://github.com/apache/kafka/pull/10985] make it back to the 2.8 
> sticky assignor, since it's pretty much impossible to smoothly cherrypick 
> that commit from 3.0 to 2.8 due to all the recent improvements and 
> refactoring in the AbstractStickyAssignor. Either we can just extract and 
> apply those two fixes to 2.8 directly, or go back and port all the commits 
> that made this cherrypick difficult over to 2.8 as well. If we do so then 
> cherrypicking the original commit should be easy.
> See [this 
> comment|https://github.com/apache/kafka/pull/10985#issuecomment-879521196]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13081) Port sticky assignor fixes (KAFKA-12984) back to 2.8

2021-08-16 Thread David Jacot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17399566#comment-17399566
 ] 

David Jacot commented on KAFKA-13081:
-

[~ableegoldman] [~showuon] This is the only remaining blocker for 2.8.1. Would 
you already have an ETA for it?

> Port sticky assignor fixes (KAFKA-12984) back to 2.8
> 
>
> Key: KAFKA-13081
> URL: https://issues.apache.org/jira/browse/KAFKA-13081
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Assignee: Luke Chen
>Priority: Blocker
> Fix For: 2.8.1
>
>
> We should make sure that fix #1 and #2 of 
> [#10985|https://github.com/apache/kafka/pull/10985] make it back to the 2.8 
> sticky assignor, since it's pretty much impossible to smoothly cherrypick 
> that commit from 3.0 to 2.8 due to all the recent improvements and 
> refactoring in the AbstractStickyAssignor. Either we can just extract and 
> apply those two fixes to 2.8 directly, or go back and port all the commits 
> that made this cherrypick difficult over to 2.8 as well. If we do so then 
> cherrypicking the original commit should be easy.
> See [this 
> comment|https://github.com/apache/kafka/pull/10985#issuecomment-879521196]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13204) wrong assignor selected if the assignor name is identical

2021-08-16 Thread Luke Chen (Jira)
Luke Chen created KAFKA-13204:
-

 Summary: wrong assignor selected if the assignor name is identical
 Key: KAFKA-13204
 URL: https://issues.apache.org/jira/browse/KAFKA-13204
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.8.0
Reporter: Luke Chen
Assignee: Luke Chen


We used the partition assignor name to identify which assignor to use in 
consumer coordinator. But we didn't do any assignor name conflict check, which 
will cause the wrong assignor got selected issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13204) wrong assignor selected if the assignor name is identical

2021-08-16 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-13204:
--
Description: We used the partition assignor name to identify which assignor 
to use in consumer coordinator. But we didn't do any assignor name conflict 
check, which will cause the wrong assignor got selected when performing 
assignment.  (was: We used the partition assignor name to identify which 
assignor to use in consumer coordinator. But we didn't do any assignor name 
conflict check, which will cause the wrong assignor got selected issue.)

> wrong assignor selected if the assignor name is identical
> -
>
> Key: KAFKA-13204
> URL: https://issues.apache.org/jira/browse/KAFKA-13204
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.8.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
>
> We used the partition assignor name to identify which assignor to use in 
> consumer coordinator. But we didn't do any assignor name conflict check, 
> which will cause the wrong assignor got selected when performing assignment.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-13195) StateSerde don't honor DeserializationExceptionHandler

2021-08-16 Thread Ludo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17399536#comment-17399536
 ] 

Ludo commented on KAFKA-13195:
--

Thanks [~mjsax] for all the feedback. 

I understand your point of view, about the option you describe : 
 * A custom deserializer can be an option for simple POJO (mean simple 
application with simple POJO embedded in the application), in my case, it's 
[plugins based|https://kestra.io/docs/plugin-developer-guide/] solution, and I 
need to be resilient to external changed that I don't know : If the plugins is 
breaking 1 message, the app must continue processing all the other. 
 * using a Transfomer, don't really catch it here. Look at the stack trace, I 
don't know how to catch this exception on my side, the only way (as I see) to 
catch it is with a [Custom 
Deserializer|https://github.com/kestra-io/kestra/blob/b55ac08ba9661b7803cfd7ac4116f4c93c9f29ec/runner-kafka/src/main/java/io/kestra/runner/kafka/serializers/JsonDeserializer.java#L32]
 but I will be only able to return a null value here, not really sure the 
underlying part of state store will not crash?
 * Reset the application is also not an option, since the deserialization will 
failed on deserialization on my case even with reprocessing (still no idea of 
what plugins have done).

I agree with you this is not a bug with your explanation, but maybe another 
option to handle it like 
{{StreamsConfig.}}{{DEFAULT_STATE_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG}}.
 will be fined with the same behavior from input topic ? 

In a real world, stream application must never crashed, and give the control to 
handle invalid data on state store (skip, dead letter queue, delete from state 
store ...) is a real need in my opinion. 

What do you think ?  

 

> StateSerde don't honor DeserializationExceptionHandler
> --
>
> Key: KAFKA-13195
> URL: https://issues.apache.org/jira/browse/KAFKA-13195
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Ludo
>Priority: Major
>
> Kafka streams allow to configure an 
> [DeserializationExceptionHandler|https://docs.confluent.io/platform/current/streams/faq.html#failure-and-exception-handling]
>  
> When you are using a StateStore most of message will be a copy of original 
> message in internal topic and mostly will use the same serializer if the 
> message is another type. 
> You can see 
> [here|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java#L159-L161]
>  that StateSerde is using the raw Deserializer and not honor the 
> {{StreamsConfig.}}{{DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG}}.
> Leading to crash the application (reaching the 
> {{setUncaughtExceptionHandler}} method).
> I think the state store must have the same behavior than the 
> {{RecordDeserializer}} and honor the DeserializationExceptionHandler.
>  
> Stacktrace (coming from kafka stream 2.6.1) :
>  
> {code:java}
> Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing 
> !Uncaught exception in Kafka Stream kestra_executor-StreamThread-1, closing 
> !org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_14, processor=workertaskjoined-repartition-source, 
> topic=kestra_executor-workertaskjoined-repartition, partition=14, 
> offset=167500, 
> stacktrace=org.apache.kafka.common.errors.SerializationException: 
> com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize 
> value of type `io.kestra.plugin.gcp.bigquery.ExtractToGcs$Format` from String 
> "txt": not one of the values accepted for Enum class: 
> [NEWLINE_DELIMITED_JSON, AVRO, PARQUET, CSV] at [Source: 
> (byte[])"{[truncated 1270 bytes]; line: 1, column: 1187] (through 
> reference chain: 
> io.kestra.core.models.flows.Flow["tasks"]->java.util.ArrayList[1]->org.kestra.task.gcp.bigquery.ExtractToGcs["format"])
>  at 
> com.fasterxml.jackson.databind.exc.InvalidFormatException.from(InvalidFormatException.java:67)
>  at 
> com.fasterxml.jackson.databind.DeserializationContext.weirdStringException(DeserializationContext.java:1851)
>  at 
> com.fasterxml.jackson.databind.DeserializationContext.handleWeirdStringValue(DeserializationContext.java:1079)
>  at 
> com.fasterxml.jackson.databind.deser.std.EnumDeserializer._deserializeAltString(EnumDeserializer.java:327)
>  at 
> com.fasterxml.jackson.databind.deser.std.EnumDeserializer._fromString(EnumDeserializer.java:214)
>  at 
> com.fasterxml.jackson.databind.deser.std.EnumDeserializer.deserialize(EnumDeserializer.java:188)
>  at 
> com.fasterxml.jackson.databind.deser.impl.FieldProperty.deserializeAndSet(FieldProperty.java:138)
>  at 
> 

[GitHub] [kafka] lkokhreidze commented on pull request #10851: KAFKA-6718 / Rack aware standby task assignor

2021-08-16 Thread GitBox


lkokhreidze commented on pull request #10851:
URL: https://github.com/apache/kafka/pull/10851#issuecomment-899245208


   Hi @cadonna !
   Thanks for the feedback. I will address your comments this week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org