[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-24 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r414654696



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##
@@ -111,8 +115,9 @@
 private final String storeName = "store";
 
 private AtomicBoolean errorInjected;
-private AtomicBoolean gcInjected;
-private volatile boolean doGC = true;
+private AtomicBoolean stallInjected;

Review comment:
   This is another case where I've gotten tripped up by the same thing 
twice, and decided to fix it this time.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-24 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r414655837



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
##
@@ -41,8 +42,8 @@
 import static 
org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;
 
 public final class AssignorConfiguration {
-public static final String HIGH_AVAILABILITY_ENABLED_CONFIG = 
"internal.high.availability.enabled";
-private final boolean highAvailabilityEnabled;
+public static final String INTERNAL_TASK_ASSIGNOR_CLASS = 
"internal.task.assignor.class";

Review comment:
   Oh, yeah, good idea.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch

2020-04-24 Thread GitBox


hachikuji commented on pull request #8509:
URL: https://github.com/apache/kafka/pull/8509#issuecomment-619100794


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch

2020-04-24 Thread GitBox


hachikuji commented on pull request #8509:
URL: https://github.com/apache/kafka/pull/8509#issuecomment-619100923


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8543: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…

2020-04-24 Thread GitBox


junrao commented on pull request #8543:
URL: https://github.com/apache/kafka/pull/8543#issuecomment-619129858


   @steverod : There seems to be compilation errors in JDK 8 test?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8545: MINOR: smoke test for jmh benchmark functionality

2020-04-24 Thread GitBox


ijuma commented on pull request #8545:
URL: https://github.com/apache/kafka/pull/8545#issuecomment-619055845


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #8543: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…

2020-04-24 Thread GitBox


chia7712 commented on pull request #8543:
URL: https://github.com/apache/kafka/pull/8543#issuecomment-619070458


   is this duplicate to #8542?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #8103: KAFKA-7061: KIP-280 Enhanced log compaction

2020-04-24 Thread GitBox


junrao commented on a change in pull request #8103:
URL: https://github.com/apache/kafka/pull/8103#discussion_r414175405



##
File path: core/src/main/scala/kafka/log/OffsetMap.scala
##
@@ -20,28 +20,80 @@ package kafka.log
 import java.util.Arrays
 import java.security.MessageDigest
 import java.nio.ByteBuffer
+
+import kafka.log.CompactionStrategy.CompactionStrategy
 import kafka.utils._
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.record.Record
+import org.apache.kafka.common.utils.{ByteUtils, Utils}
 
 trait OffsetMap {
+  /* The maximum number of entries this map can contain */
   def slots: Int
-  def put(key: ByteBuffer, offset: Long): Unit
-  def get(key: ByteBuffer): Long
+
+  /* Initialize the map with the topic compact strategy */
+  def init(strategy: String, headerKey: String, cleanerThreadId: Int, 
topicPartitionName: String)

Review comment:
   Since this is called in every round of cleaning, perhaps it should be 
called reinitialize()?

##
File path: core/src/main/scala/kafka/log/OffsetMap.scala
##
@@ -68,90 +120,164 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: 
String = "MD5") extend
 
   /**
* The number of bytes of space each entry uses (the number of bytes in the 
hash plus an 8 byte offset)
+   * This evaluates to the number of bytes in the hash plus 8 bytes for the 
offset

Review comment:
   We probably don't need the comment on the line above now?

##
File path: core/src/main/scala/kafka/log/OffsetMap.scala
##
@@ -20,28 +20,80 @@ package kafka.log
 import java.util.Arrays
 import java.security.MessageDigest
 import java.nio.ByteBuffer
+
+import kafka.log.CompactionStrategy.CompactionStrategy
 import kafka.utils._
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.record.Record
+import org.apache.kafka.common.utils.{ByteUtils, Utils}
 
 trait OffsetMap {
+  /* The maximum number of entries this map can contain */
   def slots: Int
-  def put(key: ByteBuffer, offset: Long): Unit
-  def get(key: ByteBuffer): Long
+
+  /* Initialize the map with the topic compact strategy */
+  def init(strategy: String, headerKey: String, cleanerThreadId: Int, 
topicPartitionName: String)
+
+  /**
+   * Associate this offset to the given key.
+   * @param record The record
+   * @return success flag
+   */
+  def put(record: Record): Boolean
+
+  /**
+   * Checks to see whether to retain the record or not
+   * @param record The record
+   * @return true to retain; false not to
+   */
+  def shouldRetainRecord(record: Record): Boolean
+
+  /**
+   * Get the offset associated with this key.
+   * @param key The key
+   * @return The offset associated with this key or -1 if the key is not found
+   */
+  def getOffset(key: ByteBuffer): Long
+
+  /**
+   * Get the version associated with this key for non-offset based strategy.
+   * @param key The key
+   * @return The version associated with this key or -1 if the key is not found
+   */
+  def getVersion(key: ByteBuffer): Long
+
+  /**
+   * Sets the passed value as the latest offset.
+   * @param offset teh latest offset
+   */
   def updateLatestOffset(offset: Long): Unit
-  def clear(): Unit
+
+  /* The number of entries put into the map (note that not all may remain) */

Review comment:
   Hmm, not sure that I understand "note that not all may remain".

##
File path: core/src/main/scala/kafka/log/OffsetMap.scala
##
@@ -68,90 +120,164 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: 
String = "MD5") extend
 
   /**
* The number of bytes of space each entry uses (the number of bytes in the 
hash plus an 8 byte offset)
+   * This evaluates to the number of bytes in the hash plus 8 bytes for the 
offset
+   * and, if applicable, another 8 bytes for non-offset compact strategy (set 
in the init method).
*/
-  val bytesPerEntry = hashSize + 8
-  
+  var bytesPerEntry = hashSize + longByteSize
+
   /**
* The maximum number of entries this map can contain
*/
-  val slots: Int = memory / bytesPerEntry
+  var slots: Int = memory / bytesPerEntry
+
+  /* compact strategy */
+  private var compactionStrategy: CompactionStrategy = null
+
+  /* header key for the Strategy header to look for */
+  private var headerKey: String = ""
+
+  /**
+   * Initialize the map with the topic compact strategy
+   * @param strategy The compaction strategy
+   * @param headerKey The header key if the compaction strategy is set to 
header
+   * @param cleanerThreadId The cleaner thread id
+   * @param topicPartitionName The topic partition name
+   */
+  override def init(strategy: String = Defaults.CompactionStrategyOffset, 
headerKey: String = "", cleanerThreadId: Int = -1, topicPartitionName: String = 
"") {
+// set the log indent for the topic partition
+this.logIdent = s"[OffsetMap-$cleanerThreadId $topicPartitionName]: "
+
+// Change the salt used for key hashing making all existing 

[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-24 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r414651483



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set 
allSourceTopics,
 allTasks, clientStates, numStandbyReplicas());
 
 final TaskAssignor taskAssignor;
-if (highAvailabilityEnabled) {
-if (lagComputationSuccessful) {
-taskAssignor = new HighAvailabilityTaskAssignor(
-clientStates,
-allTasks,
-statefulTasks,
-assignmentConfigs);
-} else {
-log.info("Failed to fetch end offsets for changelogs, will 
return previous assignment to clients and "
- + "trigger another rebalance to retry.");
-setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
-taskAssignor = new StickyTaskAssignor(clientStates, allTasks, 
statefulTasks, assignmentConfigs, true);
-}
+if (!lagComputationSuccessful) {

Review comment:
   This is a good thought. I think it mitigates the downside that we do 
still assign all the tasks when we fail to fetch lags, so it's not like we make 
no progress while waiting for the next rebalance. The "endless cycle" is a 
concern, but I'm not sure how it could happen in practice. I.e., what would 
make brokers consistently fail to report end offsets, but _not_ fail on any 
other APIs that Streams needs, especially since Streams needs to query the 
end-offset API during restoration anyway.
   
   It seems like the failure would either be transient or permanent(ish).
   
   If transient, then Streams will make progress during the 
probing.rebalance.interval, and succeed in balancing the assignment later. Even 
if we get further transient exceptions _during_ the sequence of HATA probing 
rebalances, the fact that we just return all tasks to their prior owners and 
that the HATA is stable mean that we just delay convergence by a single 
probing.rebalance.interval, not start all over again.
   
   If permanent, then Streams will fail anyway _after_ the assignment 
completes, since it also  tends to query the end offsets immediately after 
getting the assignment. Even if it gets all prior tasks returned, which would 
make it skip the restoration phase, it seems implausible that we'd see a 
permanent failure on _only_ the end-offset API and Streams would happily be 
able to poll, commit, manage transactions, etc.
   
   Our big alternative is just to immediately raise the exception, and leave it 
to KIP-572 to deal with the situation holistically. But I'm concerned that the 
impact of bombing out of assignment is greater than that of handling other 
failures during processing. It seems like an exception in assignment dooms the 
current Join/SyncGroup phase for everyone, which means that they have to wait 
for a timeout and then redo the rebalance. So KIP-572 can still recover 
gracefully, by reconstructing the consumer, but it can't help the extra 
downtime of waiting for the failed rebalance to time out and trying again.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-24 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r414662116



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
##
@@ -41,8 +42,8 @@
 import static 
org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;
 
 public final class AssignorConfiguration {
-public static final String HIGH_AVAILABILITY_ENABLED_CONFIG = 
"internal.high.availability.enabled";
-private final boolean highAvailabilityEnabled;
+public static final String INTERNAL_TASK_ASSIGNOR_CLASS = 
"internal.task.assignor.class";

Review comment:
   Ok, I moved it to 
`org.apache.kafka.streams.StreamsConfig.InternalConfig#INTERNAL_TASK_ASSIGNOR_CLASS`.
 I made an ad-hoc decision not to add the underscores, though, because this 
config is different than the other internal configs. I added comments to 
InternalConfig to explain the difference.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8532: HOTFIX: Fix broker bounce system tests

2020-04-24 Thread GitBox


mjsax commented on a change in pull request #8532:
URL: https://github.com/apache/kafka/pull/8532#discussion_r414679179



##
File path: tests/kafkatest/tests/streams/streams_broker_bounce_test.py
##
@@ -164,7 +164,7 @@ def setup_system(self, start_processor=True, num_threads=3):
 
 # Start test harness
 self.driver = StreamsSmokeTestDriverService(self.test_context, 
self.kafka)
-self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, 
self.kafka, num_threads)
+self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, 
self.kafka, "at_least_once", num_threads)

Review comment:
   We can try to add both eos cases, too.
   
   For `Streams[Complex]EosTestJobRunnerService`, they use a different Java 
class compared to the "smoke test" runner. Not sure if we could unify them?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-24 Thread GitBox


vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r414653962



##
File path: build.gradle
##
@@ -236,8 +236,10 @@ subprojects {
 def logStreams = new HashMap()
 beforeTest { TestDescriptor td ->
   def tid = testId(td)
+  // truncate the file name if it's too long
   def logFile = new File(
-  "${projectDir}/build/reports/testOutput/${tid}.test.stdout")
+  "${projectDir}/build/reports/testOutput/${tid.substring(0, 
Math.min(tid.size(),240))}.test.stdout"

Review comment:
   The only alternative I can think of is to parameterize the "short name" 
of the TaskAssignor, which seems kind of wacky.
   
   Also, worth noting the impact of truncation is nothing if the file name is 
still unique. If the name is shared between two tests, then the impact is still 
nothing if both tests pass. The only observable effect is that if one or both 
tests fail, their logs would get combined. It seems like we can afford just to 
defer this problem until it happens, if ever.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #8529: KAFKA-9901:Fix streams_broker_bounce_test error

2020-04-24 Thread GitBox


cadonna commented on pull request #8529:
URL: https://github.com/apache/kafka/pull/8529#issuecomment-619080823


   In order to avoid duplicate work, I just want to let you know that I also 
have a PR that fixes this issue the same way 
https://github.com/apache/kafka/pull/8532. I have already run the system tests 
on that PR to verify the fix. On my PR, there is also a discussion on which 
value to set the new constructor parameter. You can merge this or the other PR, 
I do not care. I just wanted to let you know.  



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8504: KAFKA-9298: reuse mapped stream error in joins

2020-04-24 Thread GitBox


vvcephei commented on a change in pull request #8504:
URL: https://github.com/apache/kafka/pull/8504#discussion_r414731001



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
##
@@ -989,16 +994,18 @@ private void to(final TopicNameExtractor 
topicExtractor,
 null,
 optimizableRepartitionNodeBuilder);
 
-final OptimizableRepartitionNode optimizableRepartitionNode = 
optimizableRepartitionNodeBuilder.build();
-builder.addGraphNode(streamsGraphNode, optimizableRepartitionNode);
+if (repartitionNode == null || !name.equals(repartitionName)) {

Review comment:
   Since you made the mistake of asking my opinion, here it is :) :
   
   > bumping the index
   It's true that users can't currently reuse the KStream, so there's no 
compatibility issue there, but we can't bump the index for the _first_ 
repartition topic, or we would break every topology that uses generated 
repartition topic names already. So, either way, we have to cache something to 
tell us to do something different on the "first reuse" (i.e., the second use of 
the KStream).
   
   Since we have to do that anyway, maybe it's fine to just cache the 
repartition node itself instead of a flag that says "bump the index next time". 
   
   > leaking optimizations into the DSL
   
   I'm on the fence about whether this is an "optimization" or "reasonable 
behavior". It sort of feels like the latter, and the only reason we needed to 
introduce the "repartition-collapsing" optimization is that we failed to 
introduce reasonable behavior from the beginning. Also, my read is that the DSL 
builder and the optimizer are not cleanly separated right now anyway, and if we 
ever want to build more optimizations, we'll most likely need to make another 
pass on both anyway. We're also starting to think about topology evolution (cc 
@cadonna ), which makes this a less scary prospect, as we can then implement a 
mechanism to _compatibly_ introduce new optimizations. In other words, I'm not 
taking a hard stance, but leaning in the direction of doing the more efficient 
thing than the more pure thing, since we're not currently super pure anyway.
   
   > Other repartition topics
   
   I think we'd better leave it alone for now, implement topology evolution, 
then migrate to a completely pure and consistent approach.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #8529: KAFKA-9901:Fix streams_broker_bounce_test error

2020-04-24 Thread GitBox


cadonna commented on pull request #8529:
URL: https://github.com/apache/kafka/pull/8529#issuecomment-619140731


   @jiameixie My PR was merged.   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #8524: KAFKA-9866: Avoid election for topics where preferred leader is not in ISR

2020-04-24 Thread GitBox


junrao commented on a change in pull request #8524:
URL: https://github.com/apache/kafka/pull/8524#discussion_r414740034



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1068,12 +1068,23 @@ class KafkaController(val config: KafkaConfig,
 val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp 
=> controllerContext.isReplicaOnline(leaderBroker, tp) &&
   controllerContext.partitionsBeingReassigned.isEmpty &&
   !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) &&
-  controllerContext.allTopics.contains(tp.topic))
+  controllerContext.allTopics.contains(tp.topic) &&
+  isPreferredLeaderInSync(tp)
+   )
 onReplicaElection(candidatePartitions.toSet, ElectionType.PREFERRED, 
AutoTriggered)
   }
 }
   }
 
+  private def isPreferredLeaderInSync(tp: TopicPartition): Boolean = {

Review comment:
   Perhaps a more accurate name is canPreferredReplicaBeLeader()?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-24 Thread GitBox


ableegoldman commented on pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#issuecomment-619148319


   > 29 system tests failed
   
   Expected, but still upsetting  



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8543: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…

2020-04-24 Thread GitBox


junrao commented on pull request #8543:
URL: https://github.com/apache/kafka/pull/8543#issuecomment-619128544


   @chia7712 : It's the same patch, but for a different branch.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8483: KAFKA-9865: Expose output topic names from TopologyTestDriver

2020-04-24 Thread GitBox


mjsax commented on a change in pull request #8483:
URL: https://github.com/apache/kafka/pull/8483#discussion_r414732356



##
File path: 
streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
##
@@ -855,6 +856,20 @@ public void advanceWallClockTime(final Duration advance) {
 return new TestOutputTopic<>(this, topicName, keyDeserializer, 
valueDeserializer);
 }
 
+/**
+ * Get all the names of all the topics to which records have been output.
+ * 
+ * Call this method after piping the input into the test driver to 
retrieve the full set of topics the topology
+ * produced records to.
+ * 
+ * The returned set of topic names includes changelog, repartition and 
sink topic names.
+ *
+ * @return the set of output topic names.
+ */
+public final Set getOutputTopicNames() {

Review comment:
   @big-andy-coates Can you update the method name according to the KIP 
discussion?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8532: HOTFIX: Fix broker bounce system tests

2020-04-24 Thread GitBox


abbccdda commented on a change in pull request #8532:
URL: https://github.com/apache/kafka/pull/8532#discussion_r414636938



##
File path: tests/kafkatest/tests/streams/streams_broker_bounce_test.py
##
@@ -164,7 +164,7 @@ def setup_system(self, start_processor=True, num_threads=3):
 
 # Start test harness
 self.driver = StreamsSmokeTestDriverService(self.test_context, 
self.kafka)
-self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, 
self.kafka, num_threads)
+self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, 
self.kafka, "at_least_once", num_threads)

Review comment:
   I feel neutral about it, if we are adding 2X time to system tests, 
that's probably not good.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8542: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…

2020-04-24 Thread GitBox


junrao commented on pull request #8542:
URL: https://github.com/apache/kafka/pull/8542#issuecomment-619127117


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8525: KAFKA-9885; Evict last members of a group when the maximum allowed is reached

2020-04-24 Thread GitBox


hachikuji commented on pull request #8525:
URL: https://github.com/apache/kafka/pull/8525#issuecomment-619142257


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] zhaohaidao commented on a change in pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

2020-04-26 Thread GitBox


zhaohaidao commented on a change in pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#discussion_r415334757



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -633,6 +634,45 @@ public final void copartitionSources(final 
Collection sourceNodes) {
 copartitionSourceGroups.add(Collections.unmodifiableSet(new 
HashSet<>(sourceNodes)));
 }
 
+public void validateCoPartition() {
+final List> copartitionGroups =
+copartitionSourceGroups
+.stream()
+.map(sourceGroup -> sourceGroup
+.stream()
+.flatMap(node -> 
nodeToSourceTopics.get(node).stream())
+.collect(Collectors.toSet())
+).collect(Collectors.toList());
+for (final Set coPartition : copartitionGroups) {
+final Map coPartitionProperties = 
new HashMap<>();
+internalTopicNamesWithProperties.forEach((topic, prop) -> {
+if (coPartition.contains(topic) && 
prop.getNumberOfPartitions().isPresent()) {
+coPartitionProperties.put(topic, prop);
+}
+});
+if (coPartition.size() == coPartitionProperties.size()) {

Review comment:
   It's my pleasure.
   It means that not all input topics have correspond internal topic if 
coPartition.size() != coPartitionProperties.size(), if not equal is true, we 
can just skip this validation. You can see the original validation in 
CopartitionedTopicsEnforcer#enforce
   ```
   if (copartitionGroup.equals(repartitionTopicConfigs.keySet())) {
   ...
   validateAndGetNumOfPartitions
   ...
   }
   ```
   If some of input topics don't have repartition operation, their internal 
topic partition number can be deducted by others which have repartition 
operation. You can see 
KStreamRepartitionIntegrationTest#shouldDeductNumberOfPartitionsFromRepartitionOperation
 for more details.
   
So we can skip this validation if coPartition.size() != 
coPartitionProperties.size()





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] big-andy-coates commented on pull request #8483: KAFKA-9865: Expose output topic names from TopologyTestDriver

2020-04-26 Thread GitBox


big-andy-coates commented on pull request #8483:
URL: https://github.com/apache/kafka/pull/8483#issuecomment-619531323


   @mjsax updated.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] zhaohaidao commented on a change in pull request #8550: KAFKA-9850 Move KStream#repartition operator validation during Topolo…

2020-04-26 Thread GitBox


zhaohaidao commented on a change in pull request #8550:
URL: https://github.com/apache/kafka/pull/8550#discussion_r415335411



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
##
@@ -312,6 +312,7 @@ public void buildAndOptimizeTopology(final Properties 
props) {
 graphNodePriorityQueue.offer(graphNode);
 }
 }
+internalTopologyBuilder.validateCoPartition();

Review comment:
   I'm not sure if we can remove later stage validation code
   
   @mjsax @lkokhreidze Can you give some advice?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #8525: KAFKA-9885; Evict last members of a group when the maximum allowed is reached

2020-04-24 Thread GitBox


hachikuji commented on a change in pull request #8525:
URL: https://github.com/apache/kafka/pull/8525#discussion_r414737521



##
File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
##
@@ -124,6 +124,35 @@ class GroupCoordinator(val brokerId: Int,
 info("Shutdown complete.")
   }
 
+  /**
+   * Verify if the group has space to accept the joining member. The various
+   * criteria are explained bellow.

Review comment:
   typo: `bellow`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-24 Thread GitBox


ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414822406



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -701,18 +703,33 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 internalTopologyBuilder,
 
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
 
+final int numStreamThreads;
+if (internalTopologyBuilder.hasNoNonGlobalTopology()) {
+log.warn("Overriding number of StreamThreads to zero for 
global-only topology");
+numStreamThreads = 0;
+} else {
+numStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+}
+
 // create the stream thread, global update thread, and cleanup thread
-threads = new 
StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
+threads = new StreamThread[numStreamThreads];
+
+final ProcessorTopology globalTaskTopology = 
internalTopologyBuilder.buildGlobalStateTopology();
+final boolean hasGlobalTopology = globalTaskTopology != null;
+
+if (numStreamThreads == 0 && !hasGlobalTopology) {
+log.error("Must subscribe to at least one source topic or global 
table");
+throw new IllegalArgumentException("Topology has no stream threads 
and no global threads");

Review comment:
   I guess I can't personally imagine any reason to ever want an app 
running with an empty topology, and would prefer to be notified immediately 
since I presumably did something wrong. But if you feel strongly about allowing 
this I can demote this to a warning





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] steverod commented on pull request #8543: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…

2020-04-24 Thread GitBox


steverod commented on pull request #8543:
URL: https://github.com/apache/kafka/pull/8543#issuecomment-619214390


   > is this duplicate to #8542?
   
   Yes, this is aimed at 2.4 rather than 2.5. Took a best guess for propagation.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8504: KAFKA-9298: reuse mapped stream error in joins

2020-04-24 Thread GitBox


vvcephei commented on a change in pull request #8504:
URL: https://github.com/apache/kafka/pull/8504#discussion_r414731001



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
##
@@ -989,16 +994,18 @@ private void to(final TopicNameExtractor 
topicExtractor,
 null,
 optimizableRepartitionNodeBuilder);
 
-final OptimizableRepartitionNode optimizableRepartitionNode = 
optimizableRepartitionNodeBuilder.build();
-builder.addGraphNode(streamsGraphNode, optimizableRepartitionNode);
+if (repartitionNode == null || !name.equals(repartitionName)) {

Review comment:
   Since you made the mistake of asking my opinion, here it is :) :
   
   > bumping the index
   
   It's true that users can't currently reuse the KStream, so there's no 
compatibility issue there, but we can't bump the index for the _first_ 
repartition topic, or we would break every topology that uses generated 
repartition topic names already. So, either way, we have to cache something to 
tell us to do something different on the "first reuse" (i.e., the second use of 
the KStream).
   
   Since we have to do that anyway, maybe it's fine to just cache the 
repartition node itself instead of a flag that says "bump the index next time". 
   
   > leaking optimizations into the DSL
   
   I'm on the fence about whether this is an "optimization" or "reasonable 
behavior". It sort of feels like the latter, and the only reason we needed to 
introduce the "repartition-collapsing" optimization is that we failed to 
introduce reasonable behavior from the beginning. Also, my read is that the DSL 
builder and the optimizer are not cleanly separated right now anyway, and if we 
ever want to build more optimizations, we'll most likely need to make another 
pass on both anyway. We're also starting to think about topology evolution (cc 
@cadonna ), which makes this a less scary prospect, as we can then implement a 
mechanism to _compatibly_ introduce new optimizations. In other words, I'm not 
taking a hard stance, but leaning in the direction of doing the more efficient 
thing than the more pure thing, since we're not currently super pure anyway.
   
   > Other repartition topics
   
   I think we'd better leave it alone for now, implement topology evolution, 
then migrate to a completely pure and consistent approach.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8254: KIP-557: Add Emit On Change Support

2020-04-24 Thread GitBox


vvcephei commented on pull request #8254:
URL: https://github.com/apache/kafka/pull/8254#issuecomment-619195143


   FYI, a recent change is that only committers can kick off the tests. It's 
mighty inconvenient, but better for security.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8254: KIP-557: Add Emit On Change Support

2020-04-24 Thread GitBox


vvcephei commented on pull request #8254:
URL: https://github.com/apache/kafka/pull/8254#issuecomment-619194786


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ConcurrencyPractitioner commented on pull request #8254: KIP-557: Add Emit On Change Support

2020-04-24 Thread GitBox


ConcurrencyPractitioner commented on pull request #8254:
URL: https://github.com/apache/kafka/pull/8254#issuecomment-619213065


   @vvcephei  Can you retrigger tests?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ConcurrencyPractitioner commented on pull request #8254: KIP-557: Add Emit On Change Support

2020-04-24 Thread GitBox


ConcurrencyPractitioner commented on pull request #8254:
URL: https://github.com/apache/kafka/pull/8254#issuecomment-619212877


   @vvcephei  Oh yeah, I was aware of that. Just recently though, I was added 
to the jenkins whitelist (my handle is listed in .asf.yaml). Supposedly, I will 
be able to trigger tests after doing that, but it appears that wasn't effective.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

2020-04-24 Thread GitBox


vvcephei commented on pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#issuecomment-619162446


   > > 29 system tests failed
   > 
   > Expected, but still upsetting scream_cat
   
   I should have said "130 system tests passed"



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-24 Thread GitBox


ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414819765



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -883,6 +887,50 @@ public void statefulTopologyShouldCreateStateDirectory() 
throws Exception {
 startStreamsAndCheckDirExists(topology, true);
 }
 
+@Test
+public void shouldThrowIllegalArgumentExceptionOnEmptyTopology() {
+assertThrows(
+IllegalArgumentException.class,
+() -> new KafkaStreams(new StreamsBuilder().build(), props, 
supplier, time)
+);
+}
+
+@Test
+public void shouldNotCreateStreamThreadsForGlobalOnlyTopology() {
+final StreamsBuilder builder = new StreamsBuilder();
+builder.globalTable("anyTopic");
+final KafkaStreams streams = new KafkaStreams(builder.build(), props, 
supplier, time);
+
+assertThat(streams.threads.length, equalTo(0));
+}
+
+@Test
+public void shouldNotTransitToErrorStateWithGlobalOnlyTopology() throws 
InterruptedException {
+final StreamsBuilder builder = new StreamsBuilder();
+builder.globalTable("anyTopic");
+final KafkaStreams streams = new KafkaStreams(builder.build(), props, 
supplier, time);
+streams.setStateListener((newState, oldState) -> {
+if (newState.equals(State.ERROR)) {
+throw new AssertionError("Should not have transitioned to 
ERROR state with no stream threads");

Review comment:
   I guess we don't need to throw here, it would just cause KafkaStreams to 
transition to ERROR and fail below. But I realized this doesn't even do that 
because we're mocking pretty much everything in this test class including the 
stream threads. I'll try to look for a better way and place to do this test





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-24 Thread GitBox


ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414825387



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -883,6 +887,50 @@ public void statefulTopologyShouldCreateStateDirectory() 
throws Exception {
 startStreamsAndCheckDirExists(topology, true);
 }
 
+@Test
+public void shouldThrowIllegalArgumentExceptionOnEmptyTopology() {
+assertThrows(

Review comment:
   Ack





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-24 Thread GitBox


ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414832679



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -327,15 +331,15 @@ private void prepareStreamThread(final StreamThread 
thread, final boolean termin
 
 @Test
 public void testShouldTransitToNotRunningIfCloseRightAfterCreated() {
-final KafkaStreams streams = new KafkaStreams(new 
StreamsBuilder().build(), props, supplier, time);
+final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);

Review comment:
   Would a class-level `final StreamsBuilder builder = new 
StreamsBuilder()` that we add a source to in the setUp be any. better iyo?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc commented on a change in pull request #8527: Remove dependencies on deprecated --zookeeper command flags in junit tests

2020-04-24 Thread GitBox


d8tltanc commented on a change in pull request #8527:
URL: https://github.com/apache/kafka/pull/8527#discussion_r414844095



##
File path: 
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
##
@@ -1408,13 +1408,23 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
 sslStoreProps.put(KafkaConfig.PasswordEncoderSecretProp, 
kafkaConfig.passwordEncoderSecret.map(_.value).orNull)
 
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
 
-val args = Array("--zookeeper", kafkaConfig.zkConnect,
-  "--alter", "--add-config", sslStoreProps.asScala.map { case (k, v) => 
s"$k=$v" }.mkString(","),
-  "--entity-type", "brokers",
-  "--entity-name", kafkaConfig.brokerId.toString)
-ConfigCommand.main(args)
+val entityType = ConfigType.Broker
+val entityName = kafkaConfig.brokerId.toString
 
+val passwordConfigs = 
sslStoreProps.asScala.keySet.filter(DynamicBrokerConfig.isPasswordConfig)
+val passwordEncoderConfigs = new Properties
+passwordEncoderConfigs ++= sslStoreProps.asScala.filter { case (key, _) => 
key.startsWith("password.encoder.") }

Review comment:
   Good catch. Deleted.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8535: KAFKA-9903

2020-04-24 Thread GitBox


guozhangwang commented on pull request #8535:
URL: https://github.com/apache/kafka/pull/8535#issuecomment-619168284


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8254: KIP-557: Add Emit On Change Support

2020-04-24 Thread GitBox


vvcephei commented on pull request #8254:
URL: https://github.com/apache/kafka/pull/8254#issuecomment-619194659


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-24 Thread GitBox


ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414823010



##
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##
@@ -327,15 +331,15 @@ private void prepareStreamThread(final StreamThread 
thread, final boolean termin
 
 @Test
 public void testShouldTransitToNotRunningIfCloseRightAfterCreated() {
-final KafkaStreams streams = new KafkaStreams(new 
StreamsBuilder().build(), props, supplier, time);
+final KafkaStreams streams = new 
KafkaStreams(getBuilderWithSource().build(), props, supplier, time);

Review comment:
   Is `getBuilderWithSource` really that much uglier than `new 
StreamsBuilder`? :P 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

2020-04-24 Thread GitBox


ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414822929



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -701,18 +703,33 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
 internalTopologyBuilder,
 
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
 
+final int numStreamThreads;
+if (internalTopologyBuilder.hasNoNonGlobalTopology()) {
+log.warn("Overriding number of StreamThreads to zero for 
global-only topology");
+numStreamThreads = 0;
+} else {
+numStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+}
+
 // create the stream thread, global update thread, and cleanup thread
-threads = new 
StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
+threads = new StreamThread[numStreamThreads];
+
+final ProcessorTopology globalTaskTopology = 
internalTopologyBuilder.buildGlobalStateTopology();
+final boolean hasGlobalTopology = globalTaskTopology != null;
+
+if (numStreamThreads == 0 && !hasGlobalTopology) {
+log.error("Must subscribe to at least one source topic or global 
table");
+throw new IllegalArgumentException("Topology has no stream threads 
and no global threads");

Review comment:
   Also, do we have an `InvalidTopologyException` or similar exception 
already? Or were you proposing to add a new type





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #8542: [KAFKA-9826] Handle an unaligned first dirty offset during log cleani…

2020-04-24 Thread GitBox


junrao commented on pull request #8542:
URL: https://github.com/apache/kafka/pull/8542#issuecomment-619216927


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-22 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r413086046



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,19 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {
+  def this(delayMs: Long) = this(delayMs, Time.SYSTEM)

Review comment:
   Thanks, yes, I was just coming back to say your suggestion was better 
for testability reasons.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] belugabehr opened a new pull request #8531: KAFAKA-9904: Use ThreadLocalConcurrent to Replace Random

2020-04-22 Thread GitBox


belugabehr opened a new pull request #8531:
URL: https://github.com/apache/kafka/pull/8531


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna opened a new pull request #8532: HOTFIX: Fix broker bounce system tests

2020-04-22 Thread GitBox


cadonna opened a new pull request #8532:
URL: https://github.com/apache/kafka/pull/8532


   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on issue #8532: HOTFIX: Fix broker bounce system tests

2020-04-22 Thread GitBox


cadonna commented on issue #8532:
URL: https://github.com/apache/kafka/pull/8532#issuecomment-617883911


   Streams system tests run: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3914



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] leonardge opened a new pull request #8533: Fixed bug in log validator tests.

2020-04-22 Thread GitBox


leonardge opened a new pull request #8533:
URL: https://github.com/apache/kafka/pull/8533


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-22 Thread GitBox


ijuma commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r413077069



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,19 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {

Review comment:
   Does `time` need to be a `val`?

##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,19 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {

Review comment:
   Does `time` need to be a public `val`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-04-22 Thread GitBox


lbradstreet commented on a change in pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#discussion_r413085641



##
File path: core/src/main/scala/kafka/utils/DelayedItem.scala
##
@@ -21,24 +21,19 @@ import java.util.concurrent._
 
 import org.apache.kafka.common.utils.Time
 
-import scala.math._
+class DelayedItem(val delayMs: Long, val time: Time) extends Logging {

Review comment:
   Good point. I'll make it private.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch

2020-04-22 Thread GitBox


hachikuji commented on a change in pull request #8509:
URL: https://github.com/apache/kafka/pull/8509#discussion_r413091037



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -3084,12 +3084,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 // Broker epoch in LeaderAndIsr/UpdateMetadata/StopReplica request is 
unknown
 // if the controller hasn't been upgraded to use KIP-380
 if (brokerEpochInRequest == AbstractControlRequest.UNKNOWN_BROKER_EPOCH) 
false
-else {
-  val curBrokerEpoch = controller.brokerEpoch
-  if (brokerEpochInRequest < curBrokerEpoch) true
-  else if (brokerEpochInRequest == curBrokerEpoch) false
-  else throw new IllegalStateException(s"Epoch $brokerEpochInRequest 
larger than current broker epoch $curBrokerEpoch")
-}
+else brokerEpochInRequest < controller.brokerEpoch

Review comment:
   Short comment here may be helpful about the case where the controller 
sees the epoch bump first.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on issue #8454: KAFKA-9844; Maximum number of members within a group is not always enforced due to a race condition in join group

2020-04-22 Thread GitBox


dajac commented on issue #8454:
URL: https://github.com/apache/kafka/pull/8454#issuecomment-617907578


   @hachikuji Could we get this one merged?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8532: HOTFIX: Fix broker bounce system tests

2020-04-22 Thread GitBox


cadonna commented on a change in pull request #8532:
URL: https://github.com/apache/kafka/pull/8532#discussion_r413110487



##
File path: tests/kafkatest/tests/streams/streams_broker_bounce_test.py
##
@@ -164,7 +164,7 @@ def setup_system(self, start_processor=True, num_threads=3):
 
 # Start test harness
 self.driver = StreamsSmokeTestDriverService(self.test_context, 
self.kafka)
-self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, 
self.kafka, num_threads)
+self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, 
self.kafka, "at_least_once", num_threads)

Review comment:
   Is it enough to specify the processing guarantee as `at_least_once` here 
or do you want also for this test to include all processing guarantees in the 
test matrix?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on issue #8532: HOTFIX: Fix broker bounce system tests

2020-04-22 Thread GitBox


cadonna commented on issue #8532:
URL: https://github.com/apache/kafka/pull/8532#issuecomment-617869532


   Call for review: @mjsax @abbccdda 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8532: HOTFIX: Fix broker bounce system tests

2020-04-22 Thread GitBox


cadonna commented on a change in pull request #8532:
URL: https://github.com/apache/kafka/pull/8532#discussion_r413116285



##
File path: tests/kafkatest/tests/streams/streams_broker_bounce_test.py
##
@@ -164,7 +164,7 @@ def setup_system(self, start_processor=True, num_threads=3):
 
 # Start test harness
 self.driver = StreamsSmokeTestDriverService(self.test_context, 
self.kafka)
-self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, 
self.kafka, num_threads)
+self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, 
self.kafka, "at_least_once", num_threads)

Review comment:
   On a different note, now that the processing guarantee can be passed to 
the service, do we still need `StreamsEosTestJobRunnerService` and 
`StreamsComplexEosTestJobRunnerService`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on issue #8520: Add explicit grace period to tumbling window example

2020-04-21 Thread GitBox


vvcephei commented on issue #8520:
URL: https://github.com/apache/kafka/pull/8520#issuecomment-617376010


   Gah, I forgot to add the reviewers to the merge commit.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-21 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r412422082



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,103 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClientOrNoCaughtUpClientsExist;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
 
-final TaskId task;
-final UUID source;
-final UUID destination;
+class TaskMovement {
+private final TaskId task;
+private final UUID destination;
+private final SortedSet caughtUpClients;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+private TaskMovement(final TaskId task, final UUID destination, final 
SortedSet caughtUpClients) {
 this.task = task;
-this.source = source;
 this.destination = destination;
-}
+this.caughtUpClients = caughtUpClients;
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
+if (caughtUpClients == null || caughtUpClients.isEmpty()) {
+throw new IllegalStateException("Should not attempt to move a task 
if no caught up clients exist");
 }
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
 }
 
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad = new 
ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> 
taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 

[GitHub] [kafka] vvcephei commented on issue #8254: KIP-557: Add Emit On Change Support

2020-04-21 Thread GitBox


vvcephei commented on issue #8254:
URL: https://github.com/apache/kafka/pull/8254#issuecomment-617392150


   I also took a look at the foreign-key join test, which is actually telling 
us something awesome: your feature allows us to drop _unnecessary_ tombstones 
that we'd otherwise send under some conditions.
   
   Anyway, it's complicated, so here's a fix for the test:
   ```diff
   --- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
   +++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyJoinIntegrationTest.java
   @@ -48,6 +48,7 @@ import java.util.Properties;
import java.util.function.Function;

import static java.util.Collections.emptyMap;
   +import static java.util.Collections.singletonMap;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
   @@ -371,12 +372,16 @@ public class KTableKTableForeignKeyJoinIntegrationTest 
{

// Deleting a non-joining record produces an unnecessary 
tombstone for inner joins, because
// it's not possible to know whether a result was previously 
emitted.
   +// HOWEVER, when the final join result is materialized (either 
explicitly or
   +// implicitly by a subsequent join), we _can_ detect that the 
tombstone is unnecessary and drop it.
// For the left join, the tombstone is necessary.
left.pipeInput("lhs1", (String) null);
{
assertThat(
outputTopic.readKeyValuesToMap(),
   -is(mkMap(mkEntry("lhs1", null)))
   +is(leftJoin || !(materialized || rejoin)
   +   ? mkMap(mkEntry("lhs1", null))
   +   : emptyMap())
);
if (materialized) {
assertThat(
   @@ -452,12 +457,15 @@ public class KTableKTableForeignKeyJoinIntegrationTest 
{
}
// "moving" our subscription to another non-existent FK results 
in an unnecessary tombstone for inner join,
// since it impossible to know whether the prior FK existed or 
not (and thus whether any results have
   -// previously been emitted)
   +// previously been emitted). HOWEVER, when the final join 
result is materialized (either explicitly or
   +// implicitly by a subsequent join), we _can_ detect that the 
tombstone is unnecessary and drop it.
// The left join emits a _necessary_ update (since the lhs 
record has actually changed)
left.pipeInput("lhs1", "lhsValue1|rhs2");
assertThat(
outputTopic.readKeyValuesToMap(),
   -is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs2,null)" 
: null)))
   +is(leftJoin
   +   ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs2,null)"))
   +   : (materialized || rejoin) ? emptyMap() : 
singletonMap("lhs1", null))
);
if (materialized) {
assertThat(
   @@ -469,7 +477,9 @@ public class KTableKTableForeignKeyJoinIntegrationTest {
left.pipeInput("lhs1", "lhsValue1|rhs3");
assertThat(
outputTopic.readKeyValuesToMap(),
   -is(mkMap(mkEntry("lhs1", leftJoin ? "(lhsValue1|rhs3,null)" 
: null)))
   +is(leftJoin
   +   ? mkMap(mkEntry("lhs1", "(lhsValue1|rhs3,null)"))
   +   : (materialized || rejoin) ? emptyMap() : 
singletonMap("lhs1", null))
);
if (materialized) {
assertThat(
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on issue #8254: KIP-557: Add Emit On Change Support

2020-04-21 Thread GitBox


vvcephei commented on issue #8254:
URL: https://github.com/apache/kafka/pull/8254#issuecomment-617423139


   Ok, last one. The TransformValuesTest is just another case where the test 
input data is now considered idempotent, which is fine:
   
   ```diff
   --- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
   +++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableTransformValuesTest.java
   @@ -398,8 +398,8 @@ public class KTableTransformValuesTest {
driver.createInputTopic(INPUT_TOPIC, new 
StringSerializer(), new StringSerializer());

inputTopic.pipeInput("A", "ignored", 5L);
   -inputTopic.pipeInput("A", "ignored", 15L);
   -inputTopic.pipeInput("A", "ignored", 10L);
   +inputTopic.pipeInput("A", "ignored1", 15L);
   +inputTopic.pipeInput("A", "ignored2", 10L);

assertThat(output(), hasItems(new KeyValueTimestamp<>("A", "1", 5),
new KeyValueTimestamp<>("A", "0", 15),
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on issue #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-21 Thread GitBox


vvcephei commented on issue #8497:
URL: https://github.com/apache/kafka/pull/8497#issuecomment-617439325


   Unrelated java 11 failures:
   
kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
   
org.apache.kafka.streams.integration.StoreQueryIntegrationTest.shouldQueryAllStalePartitionStores



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on issue #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-21 Thread GitBox


vvcephei commented on issue #8497:
URL: https://github.com/apache/kafka/pull/8497#issuecomment-617364180


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on issue #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-21 Thread GitBox


vvcephei commented on issue #8497:
URL: https://github.com/apache/kafka/pull/8497#issuecomment-617364803


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on issue #8254: KIP-557: Add Emit On Change Support

2020-04-21 Thread GitBox


vvcephei commented on issue #8254:
URL: https://github.com/apache/kafka/pull/8254#issuecomment-617423885


   I hope I didn't step on your toes, @ConcurrencyPractitioner , but I just 
wanted to make sure that you're unblocked to finish up this PR. Figuring out 
what's exactly wrong with those tests and whether it's ok can be a bit subtle.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] surabhidixit opened a new pull request #8526: KAFKA-6867: corrected the typos in upgrade.html

2020-04-21 Thread GitBox


surabhidixit opened a new pull request #8526:
URL: https://github.com/apache/kafka/pull/8526


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on issue #8254: KIP-557: Add Emit On Change Support

2020-04-21 Thread GitBox


vvcephei commented on issue #8254:
URL: https://github.com/apache/kafka/pull/8254#issuecomment-617418517


   The source topic restart integration test was actually just failing because 
the tests were polluting each others' topics. This is one way to fix it:
   
   ```diff
   diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
   index 3ec239fab9..b42a5852a5 100644
   --- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
   +++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KTableSourceTopicRestartIntegrationTest.java
   @@ -100,6 +100,7 @@ public class KTableSourceTopicRestartIntegrationTest {
@After
public void after() throws Exception {
IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
   +CLUSTER.deleteAllTopicsAndWait(60_000L);
}

@Test
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-21 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r412423011



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ValidClientsByTaskLoadQueue.java
##
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiFunction;
+import org.apache.kafka.streams.processor.TaskId;
+
+/**
+ * Wraps a priority queue of clients and returns the next valid candidate(s) 
based on the current task assignment
+ */
+class ValidClientsByTaskLoadQueue {
+private final PriorityQueue clientsByTaskLoad;
+private final BiFunction validClientCriteria;
+
+ValidClientsByTaskLoadQueue(final Map clientStates,
+final BiFunction 
validClientCriteria) {

Review comment:
   Ah, sorry about that @ableegoldman ; I wasn't able (or was too lazy) to 
follow the `git praise` trail through the class movement. Well, kudos to you, 
then. :) 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8497: KAFKA-6145: KIP-441 Build state constrained assignment from balanced one

2020-04-21 Thread GitBox


vvcephei commented on a change in pull request #8497:
URL: https://github.com/apache/kafka/pull/8497#discussion_r412419973



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -16,128 +16,94 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
+import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClient;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class TaskMovement {
-private static final Logger log = 
LoggerFactory.getLogger(TaskMovement.class);
-
 final TaskId task;
-final UUID source;
-final UUID destination;
+private final UUID destination;
 
-TaskMovement(final TaskId task, final UUID source, final UUID destination) 
{
+TaskMovement(final TaskId task, final UUID destination) {
 this.task = task;
-this.source = source;
 this.destination = destination;
 }
 
-@Override
-public boolean equals(final Object o) {
-if (this == o) {
-return true;
-}
-if (o == null || getClass() != o.getClass()) {
-return false;
-}
-final TaskMovement movement = (TaskMovement) o;
-return Objects.equals(task, movement.task) &&
-   Objects.equals(source, movement.source) &&
-   Objects.equals(destination, movement.destination);
-}
-
-@Override
-public int hashCode() {
-return Objects.hash(task, source, destination);
-}
-
 /**
- * Computes the movement of tasks from the state constrained to the 
balanced assignment, up to the configured
- * {@code max.warmup.replicas}. A movement corresponds to a warmup replica 
on the destination client, with
- * a few exceptional cases:
- * 
- * 1. Tasks whose destination clients are caught-up, or whose source 
clients are not caught-up, will be moved
- * immediately from the source to the destination in the state constrained 
assignment
- * 2. Tasks whose destination client previously had this task as a standby 
will not be counted towards the total
- * {@code max.warmup.replicas}. Instead they will be counted against that 
task's total {@code num.standby.replicas}.
- *
- * @param statefulActiveTaskAssignment the initial, state constrained 
assignment, with the source clients
- * @param balancedStatefulActiveTaskAssignment the final, balanced 
assignment, with the destination clients
- * @return list of the task movements from statefulActiveTaskAssignment to 
balancedStatefulActiveTaskAssignment
+ * @return whether any warmup replicas were assigned
  */
-static List getMovements(final Map> 
statefulActiveTaskAssignment,
-   final Map> 
balancedStatefulActiveTaskAssignment,
-   final Map> 
tasksToCaughtUpClients,
-   final Map 
clientStates,
-   final Map 
tasksToRemainingStandbys,
-   final int maxWarmupReplicas) {
-if (statefulActiveTaskAssignment.size() != 
balancedStatefulActiveTaskAssignment.size()) {
-throw new IllegalStateException("Tried to compute movements but 
assignments differ in size.");
-}
+static boolean assignTaskMovements(final Map> 
statefulActiveTaskAssignment,
+   final Map> 
tasksToCaughtUpClients,
+   final Map 
clientStates,
+   final Map 
tasksToRemainingStandbys,
+   final int maxWarmupReplicas) {
+boolean warmupReplicasAssigned = false;
+
+final ValidClientsByTaskLoadQueue clientsByTaskLoad =
+new ValidClientsByTaskLoadQueue(
+clientStates,
+(client, task) -> taskIsCaughtUpOnClient(task, client, 
tasksToCaughtUpClients)
+);
 
-final Map taskToDestinationClient = new HashMap<>();
-for (final Map.Entry> clientEntry : 
balancedStatefulActiveTaskAssignment.entrySet()) {
-final UUID destination = clientEntry.getKey();
-for (final TaskId task : clientEntry.getValue()) {
-taskToDestinationClient.put(task, destination);
+final SortedSet taskMovements = new TreeSet<>(
+(movement, other) -> {
+final int numCaughtUpClients = 
tasksToCaughtUpClients.get(movement.task).size();
+final int 

[GitHub] [kafka] vvcephei commented on issue #8254: KIP-557: Add Emit On Change Support

2020-04-21 Thread GitBox


vvcephei commented on issue #8254:
URL: https://github.com/apache/kafka/pull/8254#issuecomment-617372772


   Hey @ConcurrencyPractitioner , sorry it took so long. It's just again 
because the test happened to expect idempotent updates to flow through 
regularly, but not for anything important. Just changing the value of the 
"tick" record the second time fixes it without breaking anything about the test.
   
   Here's my diff:
   ```diff
   --- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
   +++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
   @@ -615,11 +615,11 @@ public class SuppressScenarioTest {
);


   -inputTopicRight.pipeInput("tick", "tick", 21L);
   +inputTopicRight.pipeInput("tick", "tick1", 21L);
verify(
drainProducerRecords(driver, "output", STRING_DESERIALIZER, 
STRING_DESERIALIZER),
asList(
   -new KeyValueTimestamp<>("tick", "(null,tick)", 21), // 
just a testing artifact
   +new KeyValueTimestamp<>("tick", "(null,tick1)", 21), // 
just a testing artifact
new KeyValueTimestamp<>("A", "(b,2)", 13L)
)
);
   @@ -703,11 +703,11 @@ public class SuppressScenarioTest {
);


   -inputTopicLeft.pipeInput("tick", "tick", 21L);
   +inputTopicLeft.pipeInput("tick", "tick1", 21L);
verify(
drainProducerRecords(driver, "output", STRING_DESERIALIZER, 
STRING_DESERIALIZER),
asList(
   -new KeyValueTimestamp<>("tick", "(tick,null)", 21), // 
just a testing artifact
   +new KeyValueTimestamp<>("tick", "(tick1,null)", 21), // 
just a testing artifact
new KeyValueTimestamp<>("A", "(2,b)", 13L)
)
);
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable

2020-04-22 Thread GitBox


abbccdda commented on a change in pull request #8486:
URL: https://github.com/apache/kafka/pull/8486#discussion_r413442881



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -498,6 +498,104 @@ public void validateOffsetsIfNeeded() {
 validateOffsetsAsync(partitionsToValidate);
 }
 
+/**
+ * For each partition which needs validation, make an asynchronous request 
to get the end-offsets for the partition
+ * with the epoch less than or equal to the epoch the partition last saw.
+ *
+ * Requests are grouped by Node for efficiency.
+ */
+private void validateOffsetsAsync(Map partitionsToValidate) {
+final Map> 
regrouped =
+regroupFetchPositionsByLeader(partitionsToValidate);
+
+regrouped.forEach((node, fetchPositions) -> {
+if (node.isEmpty()) {
+metadata.requestUpdate();
+return;
+}
+
+NodeApiVersions nodeApiVersions = apiVersions.get(node.idString());
+if (nodeApiVersions == null) {
+client.tryConnect(node);
+return;
+}
+
+if (!hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
+log.debug("Skipping validation of fetch offsets for partitions 
{} since the broker does not " +
+  "support the required protocol version 
(introduced in Kafka 2.3)",
+fetchPositions.keySet());
+completeAllValidations(fetchPositions);
+return;
+}
+
+// We need to get the client epoch state before sending out the 
leader epoch request, and use it to
+// decide whether we need to validate offsets.
+if (!metadata.hasReliableLeaderEpochs()) {

Review comment:
   Np, will do





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jiameixie commented on issue #8529: KAFKA-9901:Fix streams_broker_bounce_test error

2020-04-22 Thread GitBox


jiameixie commented on issue #8529:
URL: https://github.com/apache/kafka/pull/8529#issuecomment-618169833


   @guozhangwang @ijuma @junrao PTAL, thanks



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #8524: KAFKA-9866: Avoid election for topics where preferred leader is not in ISR

2020-04-22 Thread GitBox


junrao commented on a change in pull request #8524:
URL: https://github.com/apache/kafka/pull/8524#discussion_r413436495



##
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##
@@ -1068,7 +1068,9 @@ class KafkaController(val config: KafkaConfig,
 val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp 
=> controllerContext.isReplicaOnline(leaderBroker, tp) &&
   controllerContext.partitionsBeingReassigned.isEmpty &&
   !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) &&
-  controllerContext.allTopics.contains(tp.topic))
+  controllerContext.allTopics.contains(tp.topic) &&
+  controllerContext.partitionLeadershipInfo.get(tp).forall(l => 
l.leaderAndIsr.isr.contains(leaderBroker))

Review comment:
   The preferred leader election also checks for live brokers. So, perhaps 
we could just call 
PartitionLeaderElectionAlgorithms.preferredReplicaPartitionLeaderElection() 
here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on issue #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch

2020-04-22 Thread GitBox


hachikuji commented on issue #8509:
URL: https://github.com/apache/kafka/pull/8509#issuecomment-618070328







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on issue #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch

2020-04-22 Thread GitBox


hachikuji commented on issue #8509:
URL: https://github.com/apache/kafka/pull/8509#issuecomment-618070266


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8326: KAFKA-8639: Replace AddPartitionsToTxn with Automated Protocol

2020-04-22 Thread GitBox


guozhangwang commented on a change in pull request #8326:
URL: https://github.com/apache/kafka/pull/8326#discussion_r413374355



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
##
@@ -17,157 +17,109 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.AddPartitionsToTxnRequestData;
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH;
-import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-import static org.apache.kafka.common.protocol.CommonFields.TRANSACTIONAL_ID;
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-
 public class AddPartitionsToTxnRequest extends AbstractRequest {
-private static final String TOPICS_KEY_NAME = "topics";
-private static final String PARTITIONS_KEY_NAME = "partitions";
-
-private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V0 = new Schema(
-TRANSACTIONAL_ID,
-PRODUCER_ID,
-PRODUCER_EPOCH,
-new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
-TOPIC_NAME,
-new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32,
-"The partitions to add to the transaction."));
-
-/**
- * The version number is bumped to indicate that on quota violation 
brokers send out responses before throttling.
- */
-private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V1 = 
ADD_PARTITIONS_TO_TXN_REQUEST_V0;
-
-public static Schema[] schemaVersions() {
-return new Schema[]{ADD_PARTITIONS_TO_TXN_REQUEST_V0, 
ADD_PARTITIONS_TO_TXN_REQUEST_V1};
-}
+
+public final AddPartitionsToTxnRequestData data;
 
 public static class Builder extends 
AbstractRequest.Builder {
-private final String transactionalId;
-private final long producerId;
-private final short producerEpoch;
-private final List partitions;
+public final AddPartitionsToTxnRequestData data;
 
-public Builder(String transactionalId, long producerId, short 
producerEpoch, List partitions) {
+public Builder(final AddPartitionsToTxnRequestData data) {
 super(ApiKeys.ADD_PARTITIONS_TO_TXN);
-this.transactionalId = transactionalId;
-this.producerId = producerId;
-this.producerEpoch = producerEpoch;
-this.partitions = partitions;
+this.data = data;
+}
+
+public Builder(final String transactionalId,
+   final long producerId,
+   final short producerEpoch,
+   final List partitions) {
+super(ApiKeys.ADD_PARTITIONS_TO_TXN);
+
+Map> partitionMap = new HashMap<>();
+for (TopicPartition topicPartition : partitions) {
+String topicName = topicPartition.topic();
+
+List subPartitions = 
partitionMap.getOrDefault(topicName,
+new ArrayList<>());
+subPartitions.add(topicPartition.partition());
+partitionMap.put(topicName, subPartitions);
+}
+
+AddPartitionsToTxnTopicCollection topics = new 
AddPartitionsToTxnTopicCollection();
+for (Map.Entry> partitionEntry : 
partitionMap.entrySet()) {
+topics.add(new AddPartitionsToTxnTopic()
+   .setName(partitionEntry.getKey())
+   .setPartitions(partitionEntry.getValue()));
+}
+
+this.data = new AddPartitionsToTxnRequestData()
+.setTransactionalId(transactionalId)
+.setProducerId(producerId)
+.setProducerEpoch(producerEpoch)
+.setTopics(topics);
 }
 
 @Override
 public AddPartitionsToTxnRequest build(short version) {
-return new AddPartitionsToTxnRequest(version, transactionalId, 
producerId, producerEpoch, partitions);
+return new AddPartitionsToTxnRequest(data, version);
 

[GitHub] [kafka] abbccdda commented on a change in pull request #8326: KAFKA-8639: Replace AddPartitionsToTxn with Automated Protocol

2020-04-22 Thread GitBox


abbccdda commented on a change in pull request #8326:
URL: https://github.com/apache/kafka/pull/8326#discussion_r413471840



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java
##
@@ -17,157 +17,109 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.AddPartitionsToTxnRequestData;
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopic;
+import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.AddPartitionsToTxnTopicCollection;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_EPOCH;
-import static org.apache.kafka.common.protocol.CommonFields.PRODUCER_ID;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-import static org.apache.kafka.common.protocol.CommonFields.TRANSACTIONAL_ID;
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-
 public class AddPartitionsToTxnRequest extends AbstractRequest {
-private static final String TOPICS_KEY_NAME = "topics";
-private static final String PARTITIONS_KEY_NAME = "partitions";
-
-private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V0 = new Schema(
-TRANSACTIONAL_ID,
-PRODUCER_ID,
-PRODUCER_EPOCH,
-new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
-TOPIC_NAME,
-new Field(PARTITIONS_KEY_NAME, new ArrayOf(INT32,
-"The partitions to add to the transaction."));
-
-/**
- * The version number is bumped to indicate that on quota violation 
brokers send out responses before throttling.
- */
-private static final Schema ADD_PARTITIONS_TO_TXN_REQUEST_V1 = 
ADD_PARTITIONS_TO_TXN_REQUEST_V0;
-
-public static Schema[] schemaVersions() {
-return new Schema[]{ADD_PARTITIONS_TO_TXN_REQUEST_V0, 
ADD_PARTITIONS_TO_TXN_REQUEST_V1};
-}
+
+public final AddPartitionsToTxnRequestData data;
 
 public static class Builder extends 
AbstractRequest.Builder {
-private final String transactionalId;
-private final long producerId;
-private final short producerEpoch;
-private final List partitions;
+public final AddPartitionsToTxnRequestData data;
 
-public Builder(String transactionalId, long producerId, short 
producerEpoch, List partitions) {
+public Builder(final AddPartitionsToTxnRequestData data) {
 super(ApiKeys.ADD_PARTITIONS_TO_TXN);
-this.transactionalId = transactionalId;
-this.producerId = producerId;
-this.producerEpoch = producerEpoch;
-this.partitions = partitions;
+this.data = data;
+}
+
+public Builder(final String transactionalId,
+   final long producerId,
+   final short producerEpoch,
+   final List partitions) {
+super(ApiKeys.ADD_PARTITIONS_TO_TXN);
+
+Map> partitionMap = new HashMap<>();
+for (TopicPartition topicPartition : partitions) {
+String topicName = topicPartition.topic();
+
+List subPartitions = 
partitionMap.getOrDefault(topicName,
+new ArrayList<>());
+subPartitions.add(topicPartition.partition());
+partitionMap.put(topicName, subPartitions);
+}
+
+AddPartitionsToTxnTopicCollection topics = new 
AddPartitionsToTxnTopicCollection();
+for (Map.Entry> partitionEntry : 
partitionMap.entrySet()) {
+topics.add(new AddPartitionsToTxnTopic()
+   .setName(partitionEntry.getKey())
+   .setPartitions(partitionEntry.getValue()));
+}
+
+this.data = new AddPartitionsToTxnRequestData()
+.setTransactionalId(transactionalId)
+.setProducerId(producerId)
+.setProducerEpoch(producerEpoch)
+.setTopics(topics);
 }
 
 @Override
 public AddPartitionsToTxnRequest build(short version) {
-return new AddPartitionsToTxnRequest(version, transactionalId, 
producerId, producerEpoch, partitions);
+return new AddPartitionsToTxnRequest(data, version);
 }
 
 

[GitHub] [kafka] hachikuji commented on issue #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch

2020-04-22 Thread GitBox


hachikuji commented on issue #8509:
URL: https://github.com/apache/kafka/pull/8509#issuecomment-618147106


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8248: KAFKA-9501: convert between active and standby without closing stores

2020-04-22 Thread GitBox


ableegoldman commented on a change in pull request #8248:
URL: https://github.com/apache/kafka/pull/8248#discussion_r391363596



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java
##
@@ -132,47 +130,74 @@ private static String getTaskProducerClientId(final 
String threadClientId, final
 partitions
 );
 
-if (threadProducer == null) {
-final String taskProducerClientId = 
getTaskProducerClientId(threadId, taskId);
-final Map producerConfigs = 
config.getProducerConfigs(taskProducerClientId);
-producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
applicationId + "-" + taskId);
-log.info("Creating producer client for task {}", taskId);
-taskProducers.put(taskId, 
clientSupplier.getProducer(producerConfigs));
-}
-
-final RecordCollector recordCollector = new RecordCollectorImpl(
-logContext,
-taskId,
-consumer,
-threadProducer != null ?
-new StreamsProducer(threadProducer, false, logContext, 
applicationId) :
-new StreamsProducer(taskProducers.get(taskId), true, 
logContext, applicationId),
-config.defaultProductionExceptionHandler(),
-
EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)),
-streamsMetrics
-);
-
-final Task task = new StreamTask(
+createdTasks.add(createStreamTask(
 taskId,
 partitions,
-topology,
 consumer,
-config,
-streamsMetrics,
-stateDirectory,
-cache,
-time,
+logContext,
 stateManager,
-recordCollector
-);
-
-log.trace("Created task {} with assigned partitions {}", taskId, 
partitions);
-createdTasks.add(task);
-createTaskSensor.record();
+topology));
 }
 return createdTasks;
 }
 
+private StreamTask createStreamTask(final TaskId taskId,
+final Set partitions,
+final Consumer 
consumer,
+final LogContext logContext,
+final ProcessorStateManager 
stateManager,
+final ProcessorTopology topology) {
+if (threadProducer == null) {
+final String taskProducerClientId = 
getTaskProducerClientId(threadId, taskId);
+final Map producerConfigs = 
config.getProducerConfigs(taskProducerClientId);
+producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, 
applicationId + "-" + taskId);
+log.info("Creating producer client for task {}", taskId);
+taskProducers.put(taskId, 
clientSupplier.getProducer(producerConfigs));
+}
+
+final RecordCollector recordCollector = new RecordCollectorImpl(
+logContext,
+taskId,
+consumer,
+threadProducer != null ?
+new StreamsProducer(threadProducer, false, logContext, 
applicationId) :
+   
   new StreamsProducer(taskProducers.get(taskId), true, logContext, 
applicationId),
+config.defaultProductionExceptionHandler(),
+
EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)),
+streamsMetrics
+);
+
+final StreamTask task = new StreamTask(
+taskId,
+partitions,
+topology,
+consumer,
+config,
+streamsMetrics,
+stateDirectory,
+cache,
+time,
+stateManager,
+recordCollector
+);
+
+log.trace("Created task {} with assigned partitions {}", taskId, 
partitions);
+createTaskSensor.record();
+return task;
+}
+
+StreamTask convertStandbyToActive(final StandbyTask standbyTask,
+  final Set partitions,
+  final Consumer consumer) 
{
+return createStreamTask(
+standbyTask.id,
+partitions,
+consumer,
+getLogContext(standbyTask.id),
+standbyTask.stateMgr,
+standbyTask.topology);

Review comment:
   The `topology` is created but never initialized for a standby, therefore 
we don't need to worry about closing it and can reuse it here





This is an automated message from the Apache Git Service.

[GitHub] [kafka] chia7712 commented on issue #5935: KAFKA-7665: Replace BaseConsumerRecord with ConsumerRecord in MM

2020-04-22 Thread GitBox


chia7712 commented on issue #5935:
URL: https://github.com/apache/kafka/pull/5935#issuecomment-618162318


   @huxihx Are you still working on this? I'd like to complete both of KIP and 
this PR :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8504: KAFKA-9298: reuse mapped stream error in joins

2020-04-22 Thread GitBox


mjsax commented on a change in pull request #8504:
URL: https://github.com/apache/kafka/pull/8504#discussion_r413380852



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
##
@@ -989,16 +994,18 @@ private void to(final TopicNameExtractor 
topicExtractor,
 null,
 optimizableRepartitionNodeBuilder);
 
-final OptimizableRepartitionNode optimizableRepartitionNode = 
optimizableRepartitionNodeBuilder.build();
-builder.addGraphNode(streamsGraphNode, optimizableRepartitionNode);
+if (repartitionNode == null || !name.equals(repartitionName)) {

Review comment:
   Hmmm... I am wondering if just bumping the index would be sufficient and 
the optimizer would merge the node automatically?
   
   I am also not sure about the code structure: so far, the DSL layer does not 
know much about optimizations (even if we "leak" a little bit into it, as we 
built up the `StreamsGraphNode` graph... We would push some optimization 
decisions into the DSL layer thus spreading out "optimization code"? On the 
other hand, just inserting one `OptimizableRepartitionNode` is much more 
efficient than inserting multiple and let the optimizer remove them later?
   
   I am also wondering, if we could do the same for other repartition topics? 
   
   Last question: this method is also use for stream-table joins and thus, if 
one joins a stream with two tables, would this change be backward incompatible? 
Or would two stream-table joins fail with the same `InvalidTopologyException`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #8069: KAFKA-9374: Make connector interactions asynchronous

2020-04-22 Thread GitBox


C0urante commented on a change in pull request #8069:
URL: https://github.com/apache/kafka/pull/8069#discussion_r413499718



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
##
@@ -263,17 +267,20 @@ public boolean startConnector(
 Plugins.compareAndSwapLoaders(savedLoader);
 workerMetricsGroup.recordConnectorStartupFailure();
 statusListener.onFailure(connName, t);
-return false;
+onConnectorStateChange.onCompletion(t, null);
+return;
 }
+workerConnector.transitionTo(initialState, onConnectorStateChange);

Review comment:
   This part still needs some work; it's in an inconsistent state because I 
modified `Worker::startConnector` to have no return value and instead 
communicate all success or failure of the connector startup through the 
callback, but haven't taken care of issues like possibly invoking the callback 
twice (once in this method, and once in the `WorkerConnector` instance), making 
sure to swap plugin classloaders at the right times, and preventing a possible 
race with the check to see if the connector already exists based on whether its 
name is present as a key in the `connectors` map.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jiameixie commented on issue #8446: KAFKA-9804:Extract consumer configs out of PerfConfig

2020-04-22 Thread GitBox


jiameixie commented on issue #8446:
URL: https://github.com/apache/kafka/pull/8446#issuecomment-618169937


   @guozhangwang @ijuma @junrao PTAL, thanks



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on issue #8504: KAFKA-9298: reuse mapped stream error in joins

2020-04-22 Thread GitBox


mjsax commented on issue #8504:
URL: https://github.com/apache/kafka/pull/8504#issuecomment-618073161


   > Ideally, the fix should be to generate a repartition topic name each time 
to avoid such issues. But IMHO that ship has already sailed because by 
introducing a new name generation will cause compatibility issues for existing 
topologies. 
   
   Why that? Because such a topology would hit the bug, it could never be 
deployed, and thus nobody can actually run such a topology? In fact, shouldn't 
be "burn" and index even if a name is provided (IIRC, we do this for some 
cases)?
   
   I agree thought, that merging repartition topics (as proposed in (1)) should 
be done if possible (it's a historic artifact that we did not merge them in the 
past and IMHO we should not make the same mistake again?).
   
   For (2), it's a tricky question because the different names are used for 
different stores and changelog topics  (ie, main purpose?) -- it seems to be a 
"nasty side effect" if we would end up with two repartition topics for this 
case? Of course, given the new `repartition()` operator, a user can work around 
it by using it after `map()` and before calling `join()`. Just brainstorming 
here what the impact could be and what tradeoff we want to pick.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on issue #8534: KAFKA-9512: Backport Flaky Test LagFetchIntegrationTest.shouldFetchLagsDuringRestoration

2020-04-22 Thread GitBox


guozhangwang commented on issue #8534:
URL: https://github.com/apache/kafka/pull/8534#issuecomment-618104185


   Thanks @vinothchandar !!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable

2020-04-22 Thread GitBox


guozhangwang commented on a change in pull request #8486:
URL: https://github.com/apache/kafka/pull/8486#discussion_r413417021



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -498,6 +498,104 @@ public void validateOffsetsIfNeeded() {
 validateOffsetsAsync(partitionsToValidate);
 }
 
+/**
+ * For each partition which needs validation, make an asynchronous request 
to get the end-offsets for the partition
+ * with the epoch less than or equal to the epoch the partition last saw.
+ *
+ * Requests are grouped by Node for efficiency.
+ */
+private void validateOffsetsAsync(Map partitionsToValidate) {
+final Map> 
regrouped =
+regroupFetchPositionsByLeader(partitionsToValidate);
+
+regrouped.forEach((node, fetchPositions) -> {
+if (node.isEmpty()) {
+metadata.requestUpdate();
+return;
+}
+
+NodeApiVersions nodeApiVersions = apiVersions.get(node.idString());
+if (nodeApiVersions == null) {
+client.tryConnect(node);
+return;
+}
+
+if (!hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
+log.debug("Skipping validation of fetch offsets for partitions 
{} since the broker does not " +
+  "support the required protocol version 
(introduced in Kafka 2.3)",
+fetchPositions.keySet());
+completeAllValidations(fetchPositions);
+return;
+}
+
+// We need to get the client epoch state before sending out the 
leader epoch request, and use it to
+// decide whether we need to validate offsets.
+if (!metadata.hasReliableLeaderEpochs()) {
+log.debug("Skipping validation of fetch offsets for partitions 
{} since the provided leader broker " +
+  "is not reliable", fetchPositions.keySet());
+completeAllValidations(fetchPositions);
+return;
+}
+
+subscriptions.setNextAllowedRetry(fetchPositions.keySet(), 
time.milliseconds() + requestTimeoutMs);
+
+RequestFuture 
future =
+offsetsForLeaderEpochClient.sendAsyncRequest(node, 
fetchPositions);
+
+future.addListener(new 
RequestFutureListener() {
+@Override
+public void 
onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetsResult) {
+Map 
truncationWithoutResetPolicy = new HashMap<>();
+if (!offsetsResult.partitionsToRetry().isEmpty()) {
+
subscriptions.setNextAllowedRetry(offsetsResult.partitionsToRetry(), 
time.milliseconds() + retryBackoffMs);
+metadata.requestUpdate();
+}
+
+// For each OffsetsForLeader response, check if the 
end-offset is lower than our current offset
+// for the partition. If so, it means we have experienced 
log truncation and need to reposition
+// that partition's offset.
+//
+// In addition, check whether the returned offset and 
epoch are valid. If not, then we should treat
+// it as out of range and update metadata for rediscovery.
+offsetsResult.endOffsets().forEach((respTopicPartition, 
respEndOffset) -> {
+if (respEndOffset.hasUndefinedEpochOrOffset()) {
+// Should attempt to find the new leader in the 
next try.
+log.debug("Requesting metadata update for 
partition {} due to undefined epoch or offset {}",

Review comment:
   nit: `... or offset {} from OffsetsForLeaderEpoch response`

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java
##
@@ -86,4 +84,9 @@ public boolean equals(Object o) {
 public int hashCode() {
 return Objects.hash(error, leaderEpoch, endOffset);
 }
+
+public boolean hasUndefinedEpochOrOffset() {
+return this.endOffset == UNDEFINED_EPOCH_OFFSET ||

Review comment:
   For my own understanding: if endOffset is UNDEFINED the epoch should 
always be UNDEFINED too? If that's the case we can just rely on `leaderEpoch` 
alone?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -498,6 +498,104 @@ public void validateOffsetsIfNeeded() {
 validateOffsetsAsync(partitionsToValidate);
 }
 
+/**
+ * For each partition which needs validation, make an asynchronous request 
to get the end-offsets for the partition
+ * with the epoch less than or equal to the epoch the partition last saw.
+ *
+ * Requests are grouped by Node for efficiency.
+

[GitHub] [kafka] jiameixie commented on issue #8489: KAFKA-9857:Failed to build image ducker-ak-openjdk-8 on arm

2020-04-22 Thread GitBox


jiameixie commented on issue #8489:
URL: https://github.com/apache/kafka/pull/8489#issuecomment-618132157


   @guozhangwang @ijuma @junrao PTAL, thanks



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8326: KAFKA-8639: Replace AddPartitionsToTxn with Automated Protocol

2020-04-22 Thread GitBox


abbccdda commented on a change in pull request #8326:
URL: https://github.com/apache/kafka/pull/8326#discussion_r413473580



##
File path: 
core/src/test/scala/unit/kafka/server/AddPartitionsToTxnRequestServerTest.scala
##
@@ -27,9 +27,9 @@ import org.junit.{Before, Test}
 
 import scala.jdk.CollectionConverters._
 
-class AddPartitionsToTxnRequestTest extends BaseRequestTest {
-  private val topic1 = "foobartopic"
-  val numPartitions = 3
+class AddPartitionsToTxnRequestServerTest extends BaseRequestTest {

Review comment:
   Because its name has a conflict with our added 
`AddPartitionsToTxnRequestTest`, just want to clarify that.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vinothchandar commented on a change in pull request #8462: KAFKA-9846: Filter active tasks for running state in KafkaStreams#allLocalStorePartitionLags()

2020-04-21 Thread GitBox


vinothchandar commented on a change in pull request #8462:
URL: https://github.com/apache/kafka/pull/8462#discussion_r412690139



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
##
@@ -295,9 +295,10 @@ public void onRestoreEnd(final TopicPartition 
topicPartition, final String store
 }
 });
 
-restartedStreams.start();
+// Wait till the restarted instance reaches running, after 
restoring
+
startApplicationAndWaitUntilRunning(Collections.singletonList(restartedStreams),
 Duration.ofSeconds(60));

Review comment:
   this is the actual fix for flaky test .. 
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #8311: KAFKA-9434: automated protocol for alterReplicaLogDirs

2020-04-22 Thread GitBox


dajac commented on a change in pull request #8311:
URL: https://github.com/apache/kafka/pull/8311#discussion_r412287554



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -2207,21 +2212,33 @@ public AlterReplicaLogDirsResult 
alterReplicaLogDirs(Map());
 
-Map> replicaAssignmentByBroker = 
new HashMap<>();
+Map replicaAssignmentByBroker 
= new HashMap<>();
 for (Map.Entry entry: 
replicaAssignment.entrySet()) {
 TopicPartitionReplica replica = entry.getKey();
 String logDir = entry.getValue();
 int brokerId = replica.brokerId();
 TopicPartition topicPartition = new 
TopicPartition(replica.topic(), replica.partition());

Review comment:
   `topicPartition` is not used except for getting the topic and the 
partition above. `replica.topic()` and `replica.partition()` could be directly 
used instead.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -2207,21 +2212,33 @@ public AlterReplicaLogDirsResult 
alterReplicaLogDirs(Map());
 
-Map> replicaAssignmentByBroker = 
new HashMap<>();
+Map replicaAssignmentByBroker 
= new HashMap<>();
 for (Map.Entry entry: 
replicaAssignment.entrySet()) {
 TopicPartitionReplica replica = entry.getKey();
 String logDir = entry.getValue();
 int brokerId = replica.brokerId();
 TopicPartition topicPartition = new 
TopicPartition(replica.topic(), replica.partition());
-if (!replicaAssignmentByBroker.containsKey(brokerId))
-replicaAssignmentByBroker.put(brokerId, new HashMap<>());
-replicaAssignmentByBroker.get(brokerId).put(topicPartition, 
logDir);
+AlterReplicaLogDirsRequestData value = 
replicaAssignmentByBroker.computeIfAbsent(brokerId,

Review comment:
   nit: Could we rename `value` to something like `alterReplicaLogDirs`?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -2207,21 +2212,33 @@ public AlterReplicaLogDirsResult 
alterReplicaLogDirs(Map());
 
-Map> replicaAssignmentByBroker = 
new HashMap<>();
+Map replicaAssignmentByBroker 
= new HashMap<>();
 for (Map.Entry entry: 
replicaAssignment.entrySet()) {
 TopicPartitionReplica replica = entry.getKey();
 String logDir = entry.getValue();
 int brokerId = replica.brokerId();
 TopicPartition topicPartition = new 
TopicPartition(replica.topic(), replica.partition());
-if (!replicaAssignmentByBroker.containsKey(brokerId))
-replicaAssignmentByBroker.put(brokerId, new HashMap<>());
-replicaAssignmentByBroker.get(brokerId).put(topicPartition, 
logDir);
+AlterReplicaLogDirsRequestData value = 
replicaAssignmentByBroker.computeIfAbsent(brokerId,
+key -> new AlterReplicaLogDirsRequestData());
+AlterReplicaLogDir alterReplicaLogDir = value.dirs().find(logDir);
+if (alterReplicaLogDir == null) {
+alterReplicaLogDir = new AlterReplicaLogDir();
+alterReplicaLogDir.setPath(logDir);
+value.dirs().add(alterReplicaLogDir);
+}
+AlterReplicaLogDirTopic alterReplicaLogDirTopic = 
alterReplicaLogDir.topics().find(topicPartition.topic());
+if (alterReplicaLogDirTopic == null) {
+alterReplicaLogDirTopic = new AlterReplicaLogDirTopic();
+alterReplicaLogDir.topics().add(alterReplicaLogDirTopic);
+}
+alterReplicaLogDirTopic.setName(topicPartition.topic())

Review comment:
   `setName` could be done only once within the if statement.

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java
##
@@ -17,122 +17,53 @@
 
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
-
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData;
+import 

[GitHub] [kafka] dajac commented on a change in pull request #8509: KAFKA-9839: Broker should accept control requests with newer broker epoch

2020-04-22 Thread GitBox


dajac commented on a change in pull request #8509:
URL: https://github.com/apache/kafka/pull/8509#discussion_r412807377



##
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##
@@ -1553,6 +1552,179 @@ class KafkaApisTest {
 assertEquals(Errors.INVALID_REQUEST, response.error)
   }
 
+  @Test
+  def testUpdateMetadataRequestWithCurrentBrokerEpoch(): Unit = {
+val currentBrokerEpoch = 1239875L
+testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch, 
Errors.NONE)
+  }
+
+  @Test
+  def testUpdateMetadataRequestWithNewerBrokerEpochIsValid(): Unit = {
+val currentBrokerEpoch = 1239875L
+testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch + 1, 
Errors.NONE)
+  }
+
+  @Test
+  def testUpdateMetadataRequestWithStaleBrokerEpochIsRejected(): Unit = {
+val currentBrokerEpoch = 1239875L
+testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch - 1, 
Errors.STALE_BROKER_EPOCH)
+  }
+
+  def testUpdateMetadataRequest(currentBrokerEpoch: Long, 
brokerEpochInRequest: Long, expectedError: Errors): Unit = {
+val updateMetadataRequest = createBasicMetadataRequest("topicA", 1, 
brokerEpochInRequest)
+val request = buildRequest(updateMetadataRequest)
+
+val capturedResponse: Capture[RequestChannel.Response] = 
EasyMock.newCapture()
+
+EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
+EasyMock.expect(replicaManager.maybeUpdateMetadataCache(
+  EasyMock.eq(request.context.correlationId),
+  EasyMock.anyObject()
+)).andStubReturn(
+  Seq()
+)
+
+
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+EasyMock.replay(replicaManager, controller, requestChannel)
+
+createKafkaApis().handleUpdateMetadataRequest(request)
+val updateMetadataResponse = readResponse(ApiKeys.UPDATE_METADATA, 
updateMetadataRequest, capturedResponse)
+  .asInstanceOf[UpdateMetadataResponse]
+assertEquals(expectedError, updateMetadataResponse.error())
+EasyMock.verify(replicaManager)
+  }
+
+  @Test
+  def testLeaderAndIsrRequestWithCurrentBrokerEpoch(): Unit = {
+val currentBrokerEpoch = 1239875L
+testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch, 
Errors.NONE)
+  }
+
+  @Test
+  def testLeaderAndIsrRequestWithNewerBrokerEpochIsValid(): Unit = {
+val currentBrokerEpoch = 1239875L
+testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch + 1, 
Errors.NONE)
+  }
+
+  @Test
+  def testLeaderAndIsrRequestWithStaleBrokerEpochIsRejected(): Unit = {
+val currentBrokerEpoch = 1239875L
+testLeaderAndIsrRequest(currentBrokerEpoch, currentBrokerEpoch - 1, 
Errors.STALE_BROKER_EPOCH)
+  }
+
+  def testLeaderAndIsrRequest(currentBrokerEpoch: Long, brokerEpochInRequest: 
Long, expectedError: Errors): Unit = {
+val controllerId = 2
+val controllerEpoch = 6
+val capturedResponse: Capture[RequestChannel.Response] = 
EasyMock.newCapture()
+val partitionStates = Seq(
+  new LeaderAndIsrRequestData.LeaderAndIsrPartitionState()
+.setTopicName("topicW")
+.setPartitionIndex(1)
+.setControllerEpoch(1)
+.setLeader(0)
+.setLeaderEpoch(1)
+.setIsr(asList(0, 1))
+.setZkVersion(2)
+.setReplicas(asList(0, 1, 2))
+.setIsNew(false)
+).asJava
+val leaderAndIsrRequest = new LeaderAndIsrRequest.Builder(
+  ApiKeys.LEADER_AND_ISR.latestVersion,
+  controllerId,
+  controllerEpoch,
+  brokerEpochInRequest,
+  partitionStates,
+  asList(new Node(0, "host0", 9090), new Node(1, "host1", 9091))
+).build()
+val request = buildRequest(leaderAndIsrRequest)
+val response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
+  .setErrorCode(Errors.NONE.code)
+  .setPartitionErrors(asList()))
+
+EasyMock.expect(controller.brokerEpoch).andStubReturn(currentBrokerEpoch)
+EasyMock.expect(replicaManager.becomeLeaderOrFollower(
+  EasyMock.eq(request.context.correlationId),
+  EasyMock.anyObject(),
+  EasyMock.anyObject()
+)).andStubReturn(
+  response
+)
+
+
EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+EasyMock.replay(replicaManager, controller, requestChannel)
+
+createKafkaApis().handleLeaderAndIsrRequest(request)
+val leaderAndIsrResponse = readResponse(ApiKeys.LEADER_AND_ISR, 
leaderAndIsrRequest, capturedResponse)
+  .asInstanceOf[LeaderAndIsrResponse]
+assertEquals(expectedError, leaderAndIsrResponse.error())
+EasyMock.verify(replicaManager)
+  }
+
+  @Test
+  def testStopReplicaRequestWithCurrentBrokerEpoch(): Unit = {
+val currentBrokerEpoch = 1239875L
+testUpdateMetadataRequest(currentBrokerEpoch, currentBrokerEpoch, 
Errors.NONE)

Review comment:
   This and the two above should use `testStopReplicaRequest` instead of 
`testUpdateMetadataRequest`.

##
File path: 

[GitHub] [kafka] iceChen8123 commented on issue #233: KAFKA-2419; Garbage collect unused sensors

2020-04-21 Thread GitBox


iceChen8123 commented on issue #233:
URL: https://github.com/apache/kafka/pull/233#issuecomment-617512892


   Execuse me. Is this bug fix now? I just see a ExpireSensorTask in Metrics, 
but nowhere can use. Then I see beyond 800,000 sensors existed in my 
application, and it is in Old generation ,can't be GC. 
   I use kafka-clients version 2.1.1 .
   
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on a change in pull request #8417: KAFKA-8955: Add an AbstractResponse#errorCounts(Stream) and tidy

2020-04-22 Thread GitBox


tombentley commented on a change in pull request #8417:
URL: https://github.com/apache/kafka/pull/8417#discussion_r412754269



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
##
@@ -57,6 +59,10 @@ public ByteBuffer serialize(ApiKeys apiKey, short version, 
int correlationId) {
 return Collections.singletonMap(error, 1);
 }
 
+protected Map errorCounts(Stream errors) {
+return errors.collect(Collectors.groupingBy(e -> e, 
Collectors.summingInt(e -> 1)));
+}
+
 protected Map errorCounts(Collection errors) {

Review comment:
   @chia7712 you're right, but the only two remaining callers of this 
method are for RPCs which haven't been converted to the message generator. 
There's little benefit to changing them when there are already PRs for 
converting those RPCs, and I'm planning to remove this method entirely when 
those PRs have been merged. I guess I could mark this method as `@Deprecated`.
   
   Relatedly there's only a single caller of the `apiErrorCounts(Map errors)` method which is also for an not-yet-converted RPC with a PR. 
If this gets merged first I'll be able to remove `apiErrorCounts(Map errors)` in that PR. 

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsResponse.java
##
@@ -52,10 +51,9 @@ protected Struct toStruct(short version) {
 @Override
 public Map errorCounts() {
 Map counts = new HashMap<>();

Review comment:
   @chia7712 what I've tried to do in this PR so far is:
   
   * Change `for` stmt  + updateErrorCounts to use `forEach` consistently
   * Change calls to `errorCounts(Collection)` to `errorCounts(Stream)`
   
   I've not tried to change all code to use either `forEach` or 
`errorCounts(Stream)`. Obviously we could do that, but @ijuma seems happy 
enough with continuing to have these two ways to do it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] zshuo commented on issue #8224: KAFKA-9704: Fix the issue z/OS won't let us resize file when mmap.

2020-04-23 Thread GitBox


zshuo commented on issue #8224:
URL: https://github.com/apache/kafka/pull/8224#issuecomment-618311862


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #8527: Remove dependencies on deprecated --zookeeper command flags in junit tests

2020-04-23 Thread GitBox


cmccabe commented on a change in pull request #8527:
URL: https://github.com/apache/kafka/pull/8527#discussion_r413720950



##
File path: 
core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
##
@@ -293,13 +293,6 @@ class TopicCommandWithAdminClientTest extends 
KafkaServerTestHarness with Loggin
 }
   }
 
-  @Test
-  def testCreateWithUnspecifiedReplicationFactorAndPartitionsWithZkClient(): 
Unit = {

Review comment:
   It's weird that this is even in this file.  This should be moved to 
`TopicCommandWithZKClientTest.scala`, I think





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on issue #8224: KAFKA-9704: Fix the issue z/OS won't let us resize file when mmap.

2020-04-23 Thread GitBox


mimaison commented on issue #8224:
URL: https://github.com/apache/kafka/pull/8224#issuecomment-618329701


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on issue #8527: Remove dependencies on deprecated --zookeeper command flags in junit tests

2020-04-23 Thread GitBox


cmccabe commented on issue #8527:
URL: https://github.com/apache/kafka/pull/8527#issuecomment-618337731


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on issue #8069: KAFKA-9374: Make connector interactions asynchronous

2020-04-22 Thread GitBox


C0urante commented on issue #8069:
URL: https://github.com/apache/kafka/pull/8069#issuecomment-618097440


   @ncliang I've made some updates to the PR and rebased on the latest trunk; 
would you be willing to do another pass?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8486: KAFKA-9840: Skip End Offset validation when the leader epoch is not reliable

2020-04-22 Thread GitBox


abbccdda commented on a change in pull request #8486:
URL: https://github.com/apache/kafka/pull/8486#discussion_r413468323



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
##
@@ -85,10 +85,6 @@ protected OffsetForEpochResult handleResponse(
 case KAFKA_STORAGE_ERROR:
 case OFFSET_NOT_AVAILABLE:
 case LEADER_NOT_AVAILABLE:
-logger().debug("Attempt to fetch offsets for partition {} 
failed due to {}, retrying.",

Review comment:
   It is exactly the same as the subsequent cases handling.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on issue #8533: KAFKA-9589: Fixed bug in V2 log validator tests

2020-04-22 Thread GitBox


ijuma commented on issue #8533:
URL: https://github.com/apache/kafka/pull/8533#issuecomment-618192475


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #8238: KAFKA-9130: KIP-518 Allow listing consumer groups per state

2020-04-23 Thread GitBox


mimaison commented on a change in pull request #8238:
URL: https://github.com/apache/kafka/pull/8238#discussion_r413717848



##
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##
@@ -1553,6 +1554,50 @@ class KafkaApisTest {
 assertEquals(Errors.INVALID_REQUEST, response.error)
   }
 
+  @Test
+  def testListGroupsRequest(): Unit = {
+val overviews = List(
+  GroupOverview("group1", "protocol1", "Stable"),
+  GroupOverview("goupp2", "qwerty", "Empty")
+)
+val response = listGroupRequest(Option.empty, overviews)
+assertEquals(2, response.data.groups.size)
+assertEquals("", response.data.groups.get(0).groupState)
+assertEquals("", response.data.groups.get(1).groupState)

Review comment:
   Yes that's a good idea





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #8527: Remove dependencies on deprecated --zookeeper command flags in junit tests

2020-04-23 Thread GitBox


cmccabe commented on a change in pull request #8527:
URL: https://github.com/apache/kafka/pull/8527#discussion_r413719892



##
File path: 
core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
##
@@ -255,7 +225,7 @@ class ReassignPartitionsCommandArgsTest {
   @Test
   def shouldNotAllowBrokersListWithVerifyOption(): Unit = {
 val args = Array(
-  "--zookeeper", "localhost:1234",
+  "--bootstrap-server", "localhost:1234",

Review comment:
   This is a good change since the test is not specifically testing the 
legacy mode (so it should use the new mode)  Keep this one

##
File path: 
core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
##
@@ -275,7 +245,7 @@ class ReassignPartitionsCommandArgsTest {
   @Test
   def shouldNotAllowTopicsOptionWithVerify(): Unit = {
 val args = Array(
-  "--zookeeper", "localhost:1234",
+  "--bootstrap-server", "localhost:1234",

Review comment:
   keep this change

##
File path: 
core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala
##
@@ -265,7 +235,7 @@ class ReassignPartitionsCommandArgsTest {
   @Test
   def shouldNotAllowThrottleWithVerifyOption(): Unit = {
 val args = Array(
-  "--zookeeper", "localhost:1234",
+  "--bootstrap-server", "localhost:1234",

Review comment:
   keep this change





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




<    1   2   3   4   5   6   7   8   9   10   >